Code Monkey home page Code Monkey logo

webman-rabbitmq's Introduction

workbunny

workbunny/webman-rabbitmq

🐇 A PHP implementation of RabbitMQ Client for webman plugin. 🐇

A PHP implementation of RabbitMQ Client for webman plugin

Latest Stable Version Total Downloads License PHP Version Require

常见问题

  1. 什么时候使用消息队列?

    当你需要对系统进行解耦、削峰、异步的时候;如发送短信验证码、秒杀活动、资产的异步分账清算等。

  2. RabbitMQ和Redis的区别?

    Redis中的Stream的特性同样适用于消息队列,并且也包含了比较完善的ACK机制,但在一些点上与RabbitMQ存在不同:

    • Redis Stream没有完善的后台管理;RabbitMQ拥有较为完善的后台管理及Api;
    • Redis的持久化策略取舍:默认的RDB策略极端情况下存在丢失数据,AOF策略则需要牺牲一些性能;RabbitMQ持久化方案更多,可对消息持久化也可对队列持久化;
    • RabbitMQ拥有更多的插件可以提供更完善的协议支持及功能支持;
  3. 什么时候使用Redis?什么时候使用RabbitMQ?

    当你的队列使用比较单一或者比较轻量的时候,请选用 Redis Stream;当你需要一个比较完整的消息队列体系,包括需要利用交换机来绑定不同队列做一些比较复杂的消息任务的时候,请选择RabbitMQ;

    当然,如果你的队列使用也比较单一,但你需要用到一些管理后台相关系统化的功能的时候,又不想花费太多时间去开发的时候,也可以使用RabbitMQ;因为RabbitMQ提供了一整套后台管理的体系及 HTTP API 供开发者兼容到自己的管理后台中,不需要再消耗多余的时间去开发功能;

    注:这里的 轻量 指的是 无须将应用中的队列服务独立化,该队列服务是该应用独享的

简介

RabbitMQ的webman客户端插件;

  1. 支持5种消费模式:简单队列、workQueue、routing、pub/sub、exchange;
  2. 支持延迟队列(rabbitMQ须安装插件);
  3. 异步无阻塞消费、异步无阻塞生产、同步阻塞生产;

安装

composer require workbunny/webman-rabbitmq

配置

<?php
return [
    'enable' => true,

    'host'               => 'rabbitmq',
    'vhost'              => '/',
    'port'               => 5672,
    'username'           => 'guest',
    'password'           => 'guest',
    'mechanisms'         => 'AMQPLAIN',
    'timeout'            => 10,
    // 重启间隔
    'restart_interval'   => 0,
    // 心跳间隔
    'heartbeat'          => 50,
    // 心跳回调
    'heartbeat_callback' => function(){
    },
    // 错误回调
    'error_callback'     => function(Throwable $throwable){
    },
    // 复用连接
    'reuse_connection'   => false,
    // AMQPS 如需使用AMQPS请取消注释
//    'ssl'                => [
//        'cafile'      => 'ca.pem',
//        'local_cert'  => 'client.cert',
//        'local_pk'    => 'client.key',
//    ],
];

使用

  • 2.x与1.x在Builder结构有着较大的变化,1.x文档
  • 2.x已生产可用

QueueBuilder

  • 可实现官网的5种消费模式

命令行

  • 创建
# 创建一个拥有单进程消费者的QueueBuilder
./webman workbunny:rabbitmq-builder test --mode=queue
# 创建一个拥有4进程消费者的QueueBuilder
./webman workbunny:rabbitmq-builder test 4 --mode=queue

# 创建一个拥有单进程消费者的延迟QueueBuilder
./webman workbunny:rabbitmq-builder test --delayed --mode=queue
# 创建一个拥有4进程消费者的延迟QueueBuilder
./webman workbunny:rabbitmq-builder test 4 --delayed --mode=queue


# 在 process/workbunny/rabbitmq 目录下创建 TestBuilder.php
./webman workbunny:rabbitmq-builder test --mode=queue
# 在 process/workbunny/rabbitmq/project 目录下创建 TestBuilder.php
./webman workbunny:rabbitmq-builder project/test --mode=queue
# 在 process/workbunny/rabbitmq/project 目录下创建 TestAllBuilder.php
./webman workbunny:rabbitmq-builder project/testAll --mode=queue
# 延迟同理
  • 移除

移除包含了类文件的移除和配置的移除

# 移除Builder
./webman workbunny:rabbitmq-remove test
# 移除延迟Builder
./webman workbunny:rabbitmq-remove test --delayed

# 二级菜单同理
  • 关闭

关闭仅对配置进行移除

# 关闭Builder
./webman workbunny:rabbitmq-remove test --close
# 关闭延迟Builder
./webman workbunny:rabbitmq-remove test --close --delayed

# 二级菜单同理

注意

  • 创建的Builder类可以手动修改调整
  • 为Builder添加进process.php的配置可以手动修改
  • 延迟队列需要为 rabbitMQ 安装 rabbitmq_delayed_message_exchange 插件
    1. 进入 rabbitMQ 的 plugins 目录下执行命令下载插件(以rabbitMQ 3.8.x举例):
    wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
    1. 执行安装命令
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查看Builder

./webman workbunny:rabbitmq-list

注:当 Builder 未启动时,handler 与 count 显示为 --

+----------+-------------------------------------------------------------------------+-------------------------------------------------+-------+-------+
| name     | file                                                                    | handler                                         | count | mode  |
+----------+-------------------------------------------------------------------------+-------------------------------------------------+-------+-------+
| test     | /var/www/your-project/process/workbunny/rabbitmq/TestBuilder.php        | process\workbunny\rabbitmq\TestBuilder          | 1     | queue |
| test -d  | /var/www/your-project/process/workbunny/rabbitmq/TestBuilderDelayed.php | process\workbunny\rabbitmq\TestBuilderDelayed   | 1     | group |
+----------+-------------------------------------------------------------------------+-------------------------------------------------+-------+-------+

生产

  • 每个builder各包含一个连接,使用多个builder会创建多个连接
  • 生产消息默认不关闭当前连接
  • 异步生产的连接与消费者共用

1. 同步发布消息

该方法会阻塞等待至消息生产成功,返回bool

  • 发布普通消息

注:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQException 异常

use function Workbunny\WebmanRabbitMQ\sync_publish;
use process\workbunny\rabbitmq\TestBuilder;

sync_publish(TestBuilder::instance(), 'abc'); # return bool
  • 发布延迟消息

注:向普通队列发布延迟消息会抛出一个 WebmanRabbitMQException 异常

use function Workbunny\WebmanRabbitMQ\sync_publish;
use process\workbunny\rabbitmq\TestBuilder;

sync_publish(TestBuilder::instance(), 'abc', [
	'x-delay' => 10000, # 延迟10秒
]); # return bool

2. 异步发布消息

该方法不会阻塞等待,立即返回 React\Promise, 可以利用 React\Promise 进行 wait; 也可以纯异步不等待,React\Promise 项目地址

  • 发布普通消息

注:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQException 异常

use function Workbunny\WebmanRabbitMQ\async_publish;
use process\workbunny\rabbitmq\TestBuilder;

async_publish(TestBuilder::instance(), 'abc'); # return PromiseInterface|bool
  • 发布延迟消息

注:向普通队列发布延迟消息会抛出一个 WebmanRabbitMQException 异常

use function Workbunny\WebmanRabbitMQ\async_publish;
use process\workbunny\rabbitmq\TestBuilder;

async_publish(TestBuilder::instance(), 'abc', [
	'x-delay' => 10000, # 延迟10秒
]); # return PromiseInterface|bool

自定义Builder

  • 创建自定义Builder需继承实现AbstractBuilder;
    • onWorkerStart 消费进程启动时会触发,一般用于实现基础消费逻辑;
    • onWorkerStop 消费进程结束时会触发,一般用于回收资源;
    • onWorkerReload 消费进程重载,一般可置空;
    • classContent 用于配合命令行自动生成BuilderClass;
    /**
     * Builder 启动时
     *
     * @param Worker $worker
     * @return void
     */
    abstract public function onWorkerStart(Worker $worker): void;

    /**
     * Builder 停止时
     *
     * @param Worker $worker
     * @return void
     */
    abstract public function onWorkerStop(Worker $worker): void;

    /**
     * Builder 重加载时
     *
     * @param Worker $worker
     * @return void
     */
    abstract public function onWorkerReload(Worker $worker): void;

    /**
     * Command 获取需要创建的类文件内容
     *
     * @param string $namespace
     * @param string $className
     * @param bool $isDelay
     * @return string
     */
    abstract public static function classContent(string $namespace, string $className, bool $isDelay): string;
  • Builder会创建Connection,每个Connection会分别创建一个同步RabbitMQ客户端连接和一个异步客户端RabbitMQ连接;

  • 不同的Builder默认不复用Connection,配置选项reuse_connection可开启复用Connection;

    • 复用Connection可以减少创建的RabbitMQ-Client连接数,但一定程度上会降低并发能力
    • 复用不影响消费者,不影响跨进程的生产者
    • 复用仅影响当前进程内的不同Builder的生产者

说明

  • 生产可用,欢迎 issue 和 PR
  • Message 可以理解为队列、交换机的配置信息;
  • 继承实现 AbstractMessage 可以自定义Message;
  • Builder 可通过 Builder->setMessage() 可设置自定义配置;
  • 可使用 SyncClientAsyncClient 自行实现一些自定义消费/自定义生产的功能;

webman-rabbitmq's People

Contributors

chaz6chez avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar

webman-rabbitmq's Issues

【workbunny/webman-rabbitmq】消费者超时导致重复消费

问题描述

首先感谢 workbunny 提供这么一款很好用的插件。使用【workbunny】RabbitMQ客户端,插件地址:https://www.workerman.net/plugin/67, 在消费者中如果有阻塞并超过一定时间,就会导致重复消费。

框架及插件版本

"workerman/webman-framework": "^1.5.0"
"workbunny/webman-rabbitmq": "^1.0"

复现代码

生产者:IndexController.php

image

workbunny rabbitmq配置:app.php
image

消费者:TestBuilder.php
image

关键日志打印

image

关键日志说明

通过日志可以看到,在出现错误前,是有消息消费成功并返回了 ACK。在出现错误后,相关进程重启,导致消息开始重新消费,以前返回的 ACK 似乎无效?RabbitMQ的控制台,始终显示有对应数量的信息处于 Unacked 状态。

image

只发送消息,不消费

我们项目的消费者在另一个系统已经建好,现在我只想发送消息不消费,应该怎么操作呢?

2.x版本存在问题

2.x版本存在很多问题,问题如下:

  1. 文档有误:
    (1)在创建消费者的延迟QueueBuilder时,如 "./webman workbunny:rabbitmq-builder test --delayed--mode=queue" 缺少空格
    (2)移除QueueBuilder时,如 "./webman workbunny:rabbitmq-remove test --mode=queue",提示没有 --mode 选项

  2. 创建延时QueueBuilder,投递信息有误,以下为复现步骤:
    (1)版本如下:

"workerman/webman-framework": "1.4.7",
"workbunny/webman-rabbitmq": "^2.1"

(2)创建延时QueueBuilder:
/webman workbunny:rabbitmq-builder test --delayed --mode=queue
(3)投递消息

$taskDelayedBuilder = TestBuilderDelayed::instance();
$taskDelayedMsg = json_encode([ 'id'=>1111 ], JSON_UNESCAPED_UNICODE);
$taskDelayedHeader['x-delay'] = 3000;
$taskDelayedResult = sync_publish($taskDelayedBuilder, $taskDelayedMsg, null, $taskDelayedHeader);

提示:
Invalid publish.
(4)定位问题:vendor/workbunny/webman-rabbitmq/src/helpers.php
image
(5)尝试解决:
将 TestBuilderDelayed 里的 $exchangeType 值修改为 "Constants::DELAYED" ,重启后发现不会创建对应的消费者,且投递时第一次投递失败,后面可以投递成功,但不会消费

Bug: Call to a member function publish() on null

问题描述

服务首次启动调用MQ,异步发送多次任务的时候会报错, 此时控制台只会输出一个信息,第一次请求就不会出问题了

程序代码

        for ($i=1; $i <= 3; $i++){
            async_publish(TestBuilder::instance(), date('Y-m-d H:i:s').'_______'.$i);
        }

报错信息

{
    "code": 0,
    "msg": "Call to a member function publish() on null",
    "data": {
        "request_url": "GET //127.0.0.1:8666/index/mq",
        "timestamp": "2023-02-24 11:17:05",
        "client_ip": "127.0.0.1",
        "request_param": [],
        "error_message": "Call to a member function publish() on null",
        "error_trace": [
            "#0 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workbunny\\webman-rabbitmq\\src\\helpers.php(56): Workbunny\\WebmanRabbitMQ\\Connection->publish()",
            "#1 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\app\\index\\controller\\Index.php(43): Workbunny\\WebmanRabbitMQ\\async_publish()",
            "#2 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(318): app\\index\\controller\\Index->mq()",
            "#3 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(348): Webman\\App::Webman\\{closure}()",
            "#4 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\webman\\log\\src\\Middleware.php(58): Webman\\App::Webman\\{closure}()",
            "#5 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(340): Webman\\Log\\Middleware->process()",
            "#6 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\webman\\cors\\src\\CORS.php(12): Webman\\App::Webman\\{closure}()",
            "#7 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(340): Webman\\Cors\\CORS->process()",
            "#8 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\app\\common\\middleware\\RequestMonitoring.php(30): Webman\\App::Webman\\{closure}()",
            "#9 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(340): app\\common\\middleware\\RequestMonitoring->process()",
            "#10 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\App.php(167): Webman\\App::Webman\\{closure}()",
            "#11 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Connection\\TcpConnection.php(646): Webman\\App->onMessage()",
            "#12 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Events\\Select.php(311): Workerman\\Connection\\TcpConnection->baseRead()",
            "#13 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Worker.php(1479): Workerman\\Events\\Select->loop()",
            "#14 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Worker.php(1399): Workerman\\Worker::forkWorkersForWindows()",
            "#15 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\workerman\\Worker.php(560): Workerman\\Worker::forkWorkers()",
            "#16 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workerman\\webman-framework\\src\\support\\App.php(131): Workerman\\Worker::runAll()",
            "#17 D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\start.php(4): support\\App::run()",
            "#18 {main}"
        ],
        "file": "D:\\phpStudy20188\\PHPTutorial\\WWW\\cinemaadmin\\vendor\\workbunny\\webman-rabbitmq\\src\\Connection.php",
        "line": 224
    }
}

操作系统及workerman/webman等框架组件具体版本

Windows:8
CentOS:6.*
PHP: 8.0.2
webman-rabbitmq:1.0.9
webman-framework:1.4.3

进程停止后导致event-loop内还有未执行的事件

由于与rabbitmq-server的通讯工作是异步的,由workerman event-loop接管所有事件的执行工作;
当进程杀死时,event-loop也被销毁,event-loop内可能存在还未执行的事件,从而导致一些问题,如:重复消费、发送失败。

相似问题: #11

2.x rc版本同步发布队列任务返回异常

2.0beta-rc2版本
sync_publish方法执行后提示错误:
Error: Call to a member function then() on bool in ~/x/vendor/workbunny/webman-rabbitmq/src/Connection.php:204

同步方法应该返回的是布尔值不是PromiseInterface,所以不可以继续链式执行->then等promise相关操作了。

如果是用法不对请有空指正,感谢

关于消费队列退出优雅关闭连接

在本地启了一个docker运行rabbitmq,通过实时跟踪日志查看到,当webman退出时,调用消费队列进程的onWorkerStop方法时,虽然FasterBuilder中针对连接处理$this->_connection->close();,但发现rabbitmq这边有warning日志:

2022-09-04 02:55:53.458 [warning] <0.20366.1> closing AMQP connection <0.20366.1> (172.19.0.1:60280 -> 172.19.0.2:5672, vhost: '/', user: 'rabbitmq'):
client unexpectedly closed TCP connection

同时发现如果在发布消息时,通过调用helpers的async_publish方法,设置close参数为true时,连接是正常关闭的:

2022-09-04 03:02:36.367 [info] <0.20824.1> connection <0.20824.1> (172.19.0.1:60316 -> 172.19.0.2:5672): user 'rabbitmq' authenticated and granted access to vhost '/'
******
2022-09-04 03:02:36.421 [info] <0.20824.1> closing AMQP connection <0.20824.1> (172.19.0.1:60316 -> 172.19.0.2:5672, vhost: '/', user: 'rabbitmq')

关于helpers提供的消息发布方法

最近在看rabbitmq相关的组件,在尝试使用这个组件时,关于helpers下的两个方法有个疑问,如下

 * 同步生产
 * @param FastBuilder $builder
 * @param string $body
 * @param array|null $headers
 * @param bool $close
 * @return bool
 */
function sync_publish(FastBuilder $builder, string $body, ?array $headers = null, bool $close = false) : bool
{
    $message = $builder->getMessage();
    if(
        ($message->getExchangeType() !== Constants::DELAYED and $headers['x-delay'] ?? 1) or
        ($message->getExchangeType() === Constants::DELAYED and !($headers['x-delay'] ?? 0))
    ){
        throw new WebmanRabbitMQException('Invalid publish. ');
    }
    $message->setBody($body);
    if($headers !== null){
        $message->setHeaders(array_merge($message->getHeaders(), $headers));
    }
    return $builder->syncConnection()->publish($message, $close);
}

若这里$builder是普通队列,则$message的ExchangeType是Constants::DIRECT,当$headers不传递时,默认为null,那这里不是始终抛WebmanRabbitMQException了?

关于消息发送和接收的稳定性优化

推送:sync_publish(Builder::instance(), $params, null, true); 说明文档中,最后的close 参数,最好默认是true。 如果该方法应用在 controller 部分,这个会因为没有固定的 心跳监听而造成大量的发布失败。 虽然每次都会重新创建连接,那也是稳定和高效的。

process 里的消息接收。建议加入重连检查。有时候网络断开造成的消费者丢失没有重连。 或者搞个重连开关。

`
public function onWorkerStart(Worker $worker): void
{
parent::onWorkerStart($worker);

    Timer::add(10, function () {
        $this->checkConnection();
    });

}

private function checkConnection() {
    try {
        if($this->connection()->client()->isConnected() == false) {
            $this->logger->debug('Reconnect');
            $this->connection()->consume($this->getMessage());
        }else {
            $this->logger->debug('Connection is OK');
        }
    } catch (\Exception $e) {
        $this->logger->error($e->getMessage());
    }
}

`

请教如何实现topic模式,exchange对应多个队列对应各自消费者

感谢作者提供简单好用的插件!

当前使用1.0版本

我想实现的效果是:

  • 实现topic模式, 比如exchange = mqTopic ,
  • 这个交换机有两个队列mqTopic_1mqTopic_2,
  • 两个队列分别通过routing_key = task.1 (mqTopic_1)routing_key = task.2 (mqTopic_2)绑定
  • 每个队列有各自的消费者进程去消费

我的做法:

  • 通过php webman workbunny:rabbitmq-builder mqTopic 2创建了builder
  • 投递消息使用sync_publish() 目前情况是投递消息可以到达,但是无法消费
$b = MqTopicBuilder::instance();
$b->setMessage(new Message([
         'exchange_name'     => 'mqTopic',
          'exchange_type'     => Constants::TOPIC,
          'queue_name'        => 'mqTopic_'.$request->get('queue_name', 0), //对应mqTopic_1 mqTopic_2
          'routing_key'       => $request->get('routing_key',get_called_class()), //对应 task.1 task.2
          'consumer_tag'      => 'mqTopic',
          'prefetch_size'     => 0,
          'prefetch_count'    => 30,
          'is_global'         => false,
      ]));
      sync_publish($b, json_encode($data));

请问我该如何实现?如果有demo最好!

在次感谢作者无私奉献!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.