Code Monkey home page Code Monkey logo

webman-rqueue's Introduction

workbunny

workbunny/webman-rqueue

🐇 A lightweight queue based on Redis Stream for webman plugin. 🐇

A lightweight queue based on Redis Stream for webman plugin

常见问题

  1. 什么时候使用消息队列?

    当你需要对系统进行解耦、削峰、异步的时候;如发送短信验证码、秒杀活动、资产的异步分账清算等。

  2. RabbitMQ和Redis的区别?

    Redis中的Stream的特性同样适用于消息队列,并且也包含了比较完善的ACK机制,但在一些点上与RabbitMQ存在不同:

    • Redis Stream没有完善的后台管理;RabbitMQ拥有较为完善的后台管理及Api;
    • Redis的持久化策略取舍:默认的RDB策略极端情况下存在丢失数据,AOF策略则需要牺牲一些性能;RabbitMQ持久化方案更多,可对消息持久化也可对队列持久化;
    • RabbitMQ拥有更多的插件可以提供更完善的协议支持及功能支持;
  3. 什么时候使用Redis?什么时候使用RabbitMQ?

    当你的队列使用比较单一或者比较轻量的时候,请选用 Redis Stream;当你需要一个比较完整的消息队列体系,包括需要利用交换机来绑定不同队列做一些比较复杂的消息任务的时候,请选择RabbitMQ;

    当然,如果你的队列使用也比较单一,但你需要用到一些管理后台相关系统化的功能的时候,又不想花费太多时间去开发的时候,也可以使用RabbitMQ;因为RabbitMQ提供了一整套后台管理的体系及 HTTP API 供开发者兼容到自己的管理后台中,不需要再消耗多余的时间去开发功能;

    注:这里的 轻量 指的是 无须将应用中的队列服务独立化,该队列服务是该应用独享的

简介

安装

环境依赖

  • php >=8.0
  • webman >= 1.0
  • redis >= 6.2
composer require workbunny/webman-rqueue

使用

  • 一个QueueBuilder类对应一个消费Group和一个消费逻辑 QueueBuilder::handler()
  • 一个QueueBuilder可对应一个/多个Redis-Stream-Key,通过配置 QueueBuilder::$config['queues']
  • QueueBuilder类使用定时器进行消费,每一次消费之后会根据消息的属性 _header['_delete'] 来进行消息释放

命令行

--mode默认为queue

  • 创建
# 创建一个拥有单进程消费者的QueueBuilder
./webman workbunny:rqueue-builder testQueue --mode=queue
./webman workbunny:rqueue-builder testQueue -m queue
# 创建一个拥有4进程消费者的QueueBuilder
./webman workbunny:rqueue-builder testQueue 4 --mode=queue
./webman workbunny:rqueue-builder testQueue 4 -m queue
# 创建一个拥有单进程消费者的延迟QueueBuilder
./webman workbunny:rqueue-builder testQueue --delayed --mode=queue
./webman workbunny:rqueue-builder testQueue -dm queue
# 创建一个拥有4进程消费者的延迟QueueBuilder
./webman workbunny:rqueue-builder testQueue 4 --delayed --mode=queue
./webman workbunny:rqueue-builder testQueue 4 -dm queue

# 在 process/workbunny/rqueue 目录下创建 TestQueueBuilder.php
./webman workbunny:rqueue-builder testQueue
# 在 process/workbunny/rqueue/project 目录下创建 TestQueueBuilder.php
./webman workbunny:rqueue-builder project/testQueue
# 在 process/workbunny/rqueue/project 目录下创建 TestAllQueueBuilder.php
./webman workbunny:rqueue-builder project/testAllQueue
# 延迟同理
  • 移除

移除包含了类文件的移除和配置的移除

# 移除Builder
./webman workbunny:rqueue-remove testQueue
# 移除延迟Builder
./webman workbunny:rqueue-remove testQueue --delayed
./webman workbunny:rqueue-remove testQueue -d
# 二级菜单同理
  • 开启

开启仅对配置进行移除

# 开启Builder
./webman workbunny:rqueue-builder test --open --mode=queue
./webman workbunny:rqueue-builder test -om queue
# 开启延迟Builder
./webman workbunny:rqueue-builder test --open --delayed --mode=queue
./webman workbunny:rqueue-builder test -odm queue

# 二级菜单同理
  • 关闭

关闭仅对配置进行移除

# 关闭Builder
./webman workbunny:rqueue-remove test --close --mode=queue
./webman workbunny:rqueue-remove test -cm queue
# 关闭延迟Builder
./webman workbunny:rqueue-remove test --close --delayed --mode=queue
./webman workbunny:rqueue-remove test -cdm queue

# 二级菜单同理
  • 一个GroupBuilder类对应一个消费Group和一个消费逻辑 QueueBuilder::handler()
  • 一个GroupBuilder可对应一个/多个Redis-Stream-Key,通过配置 QueueBuilder::$config['queues']
  • GroupBuilder类使用定时器进行消费,使用定时器释放当前 Stream-Key 上所有Group收取过的闲置消息
  • 可以使用多个GroupBuilder类配置相同的 QueueBuilder::$config['queues'],从而达到一条/多条队列由不同的消费逻辑进行处理;
    • 基于此特性,可以实现消息持久化的发布订阅
    • 基于此特性,可以实现RabbitMQ的exchange模式

命令行

  • 创建
# 创建一个拥有单进程消费者的GroupBuilder
./webman workbunny:rqueue-builder testGroup --mode=group
./webman workbunny:rqueue-builder testGroup -m group
# 创建一个拥有4进程消费者的GroupBuilder
./webman workbunny:rqueue-builder testGroup 4 --mode=group
./webman workbunny:rqueue-builder testGroup 4 -m group
# 创建一个拥有单进程消费者的延迟GroupBuilder
./webman workbunny:rqueue-builder testGroup --delayed--mode=group
./webman workbunny:rqueue-builder testGroup -dm group
# 创建一个拥有4进程消费者的延迟GroupBuilder
./webman workbunny:rqueue-builder testGroup 4 --delayed--mode=group
./webman workbunny:rqueue-builder testGroup 4 -dm group

# 二级菜单

# 在 process/workbunny/rqueue 目录下创建 TestGroupBuilder.php
./webman workbunny:rqueue-builder testGroup --mode=group
./webman workbunny:rqueue-builder testGroup -m group
# 在 process/workbunny/rqueue/project 目录下创建 TestGroupBuilder.php
./webman workbunny:rqueue-builder project/testGroup --mode=group
./webman workbunny:rqueue-builder project/testGroup -m group
# 在 process/workbunny/rqueue/project 目录下创建 TestAllGroupBuilder.php
./webman workbunny:rqueue-builder project/testAllGroup --mode=group
./webman workbunny:rqueue-builder project/testAllGroup -m group
  • 移除

移除包含了类文件的移除和配置的移除

# 移除Builder
./webman workbunny:rqueue-remove testGroup --mode=group
./webman workbunny:rqueue-remove testGroup -m group
# 移除延迟Builder
./webman workbunny:rqueue-remove testGroup --delayed --mode=group
./webman workbunny:rqueue-remove testGroup -dm group
# 二级菜单同理
  • 开启

开启仅对配置进行移除

# 开启Builder
./webman workbunny:rqueue-builder testGroup --open --mode=group
./webman workbunny:rqueue-builder testGroup -om group
# 开启延迟Builder
./webman workbunny:rqueue-builder testGroup --open --delayed --mode=group
./webman workbunny:rqueue-builder testGroup -odm group
# 二级菜单同理
  • 关闭

关闭仅对配置进行移除

# 关闭Builder
./webman workbunny:rqueue-remove testGroup --close --mode=group
./webman workbunny:rqueue-remove testGroup -cm group
# 关闭延迟Builder
./webman workbunny:rqueue-remove testGroup --close --delayed --mode=group
./webman workbunny:rqueue-remove testGroup -cdm group
# 二级菜单同理

说明

  • AdaptiveBuilder与QueueBuilder的消费方式一致
  • AdaptiveBuilder底层定时器会根据闲置阈值进行判断,消费定时器根据退避指数 x 当前消费间隔进行重置消费间隔

相较于其他Builder的优势

  • 在消息负载较高的情况下,AdaptiveBuilder是普通的QueueBuilder
  • 在消息负载较低的情况下,AdaptiveBuilder根据闲置阈值对消费者的消费查询速率进行自适应退避调整,有效减少redis的查询压力
  • 在延时消费场景下,AdaptiveBuilder能有效减少因频繁查询redis而造成的redis-server CPU占用率较高的问题
  • 在普通消费模式下,AdaptiveBuilder相比于其他Builder能更快启动下一个消费周期,无需等待Timer的下一个loop,消费更及时

命令行

  • 创建
# 创建一个拥有单进程消费者的AdaptiveBuilder
./webman workbunny:rqueue-builder testAdaptive --mode=adaptive
./webman workbunny:rqueue-builder testAdaptive -m adaptive
# 创建一个拥有4进程消费者的AdaptiveBuilder
./webman workbunny:rqueue-builder testAdaptive 4 --mode=adaptive
./webman workbunny:rqueue-builder testAdaptive 4 -m adaptive
# 创建一个拥有单进程消费者的延迟AdaptiveBuilder
./webman workbunny:rqueue-builder testAdaptive --delayed--mode=adaptive
./webman workbunny:rqueue-builder testAdaptive -dm adaptive
# 创建一个拥有4进程消费者的延迟AdaptiveBuilder
./webman workbunny:rqueue-builder testAdaptive 4 --delayed--mode=adaptive
./webman workbunny:rqueue-builder testAdaptive 4 -dm adaptive

# 二级菜单

# 在 process/workbunny/rqueue 目录下创建
./webman workbunny:rqueue-builder testAdaptive --mode=adaptive
./webman workbunny:rqueue-builder testAdaptive -m adaptive
# 在 process/workbunny/rqueue/project 目录下创建 
./webman workbunny:rqueue-builder project/testAdaptive --mode=adaptive
./webman workbunny:rqueue-builder project/testAdaptive -m adaptive
# 在 process/workbunny/rqueue/project 目录下创建 
./webman workbunny:rqueue-builder project/testAllAdaptive --mode=adaptive
./webman workbunny:rqueue-builder project/testAllAdaptive -m adaptive
  • 移除

移除包含了类文件的移除和配置的移除

# 移除Builder
./webman workbunny:rqueue-remove testAdaptive --mode=adaptive
./webman workbunny:rqueue-remove testAdaptive -m adaptive
# 移除延迟Builder
./webman workbunny:rqueue-remove testAdaptive --delayed --mode=adaptive
./webman workbunny:rqueue-remove testAdaptive -dm adaptive
# 二级菜单同理
  • 开启

开启仅对配置进行移除

# 开启Builder
./webman workbunny:rqueue-builder testAdaptive --open --mode=adaptive
./webman workbunny:rqueue-builder testAdaptive -om adaptive
# 开启延迟Builder
./webman workbunny:rqueue-builder testAdaptive --open --delayed --mode=adaptive
./webman workbunny:rqueue-builder testAdaptive -odm adaptive
# 二级菜单同理
  • 关闭

关闭仅对配置进行移除

# 关闭Builder
./webman workbunny:rqueue-remove testAdaptive --close --mode=adaptive
./webman workbunny:rqueue-remove testAdaptive -cm adaptive
# 关闭延迟Builder
./webman workbunny:rqueue-remove testAdaptive --close --delayed --mode=adaptive
./webman workbunny:rqueue-remove testAdaptive -cdm adaptive
# 二级菜单同理

如queue/group Builder都不满足需求,您可继承 AbstractBuilder 自行实现您所需要的Builder

  • 您的Builder基类需要继承AbstractBuilder实现,可参考QueueBuilder/GroupBuilder
    • onWorkerStart 用于进程启动时的触发逻辑, 这里一般使用Timer结合读取队列触发callback来实现消费队列
    • onWorkerStop 用于进程停止时的回收动作
    • onWorkerReload 用于进程重载时的触发动作,除非有特殊处理,通常置空
    /*
     * Builder 启动时
     *
     * @param Worker $worker
     * @return void
     */
    abstract public function onWorkerStart(Worker $worker): void;

    /**
     * Builder 停止时
     *
     * @param Worker $worker
     * @return void
     */
    abstract public function onWorkerStop(Worker $worker): void;

    /**
     * Builder 重加载时
     *
     * @param Worker $worker
     * @return void
     */
    abstract public function onWorkerReload(Worker $worker): void;
  • classContent方法是配合命令行,用于自动生成队列文件,如不使用,可置空
    /**
     * Command 获取需要创建的类文件内容
     *
     * @param string $namespace
     * @param string $className
     * @param bool $isDelay
     * @return string
     */
    abstract public static function classContent(string $namespace, string $className, bool $isDelay): string;

注意

  • QueueBuilder与GroupBuilder在命令行自动生成时没有做类似Delayed的区分,用户可自行进行命名区分,如:
# 创建一个GroupBuilder
./webman workbunny:rqueue-builder testGroup --mode=group
# 创建一个QueueBuilder
./webman workbunny:rqueue-builder testQueue --mode=queue
  • 创建的Builder类可以手动修改调整

  • 为Builder添加进process.php的配置可以手动修改

查看Builder

./webman workbunny:rqueue-list

注:当 Builder 未启动时,handler 与 count 显示为 --

+----------+-----------------------------------------------------------------------+-------------------------------------------------+-------+-------+
| name     | file                                                                  | handler                                         | count | mode  |
+----------+-----------------------------------------------------------------------+-------------------------------------------------+-------+-------+
| test     | /var/www/your-project/process/workbunny/rqueue/TestBuilder.php        | process\workbunny\rqueue\TestBuilder            | 1     | queue |
| test -d  | /var/www/your-project/process/workbunny/rqueue/TestBuilderDelayed.php | process\workbunny\rqueue\TestBuilderDelayed     | 1     | group |
+----------+-----------------------------------------------------------------------+-------------------------------------------------+-------+-------+

生产

发布普通消息

注:向普通队列发布延迟消息会抛出一个 WebmanRqueueException 异常

use function Workbunny\WebmanRqueue\sync_publish;
use function Workbunny\WebmanRqueue\sync_publish_get_ids;
use process\workbunny\rqueue\TestBuilder;

# 使用函数发布,返回受影响条数,多队列不具备事务一致
/** headers参数详见 @link Header */
sync_publish(TestBuilder::instance(), 'abc', [
	'_delete' => false
]);

# 使用对象发布,返回受影响条数,多队列不具备事务一致
/** headers参数详见 @link Header */
TestBuilder::instance()->publish('abc', [
	'_delete' => false
]);

# 返回消息ID组(数组),多队列不具备事务一致
sync_publish_get_ids(TestBuilder::instance(), 'abc', [
	'_delete' => false
]);

# 返回消息ID组(数组),多队列不具备事务一致
TestBuilder::instance()->publishGetIds('abc', [
	'_delete' => false
]);

发布延迟消息

注:向延迟队列发布普通消息会抛出一个 WebmanRqueueException 异常 注:延迟队列发布消息不支持指定消息id

use function Workbunny\WebmanRqueue\sync_publish;
use function Workbunny\WebmanRqueue\sync_publish_get_ids;
use process\workbunny\rqueue\TestBuilder;

# 延迟10ms,返回受影响条数,多队列不具备事务一致
sync_publish(TestBuilder::instance(), 'abc', [
	'_delay' => 10
]);

# 延迟10ms,返回受影响条数,多队列不具备事务一致
TestBuilder::instance()->publish('abc', [
	'_delay' => 10
]);

# 延迟10ms,返回消息ID组(数组),多队列不具备事务一致
sync_publish_get_ids(TestBuilder::instance(), 'abc', [
	'_delay' => 10
]);

# 延迟10ms,返回消息ID组(数组),多队列不具备事务一致
TestBuilder::instance()->publishGetIds('abc', [
	'_delay' => 10
]);

说明

  • 生产可用,欢迎 issue 和 PR

  • Redis Stream 本身没有 delayednon-delayed 之分,组件代码将它们区分的原因是不希望 delayed 被滥用;开发者应该明确哪些消息是延迟的、哪些是立即的,并且明确体现,也方便维护,因为延迟消息过多会导致消息堆积,从而占用Redis过多的资源;

  • 延迟队列是通过对消息的不断读取放回并且加以判断是否达到延迟触发时间来进行模拟延迟触发的效果,消息不支持指定id,请在逻辑内自行实现;

  • Redis Stream 的持久化依赖 Redis 本身的持久化策略,在一定情况下 Redis Stream 也并非是可靠型的消息队列;关于持久化相关内容,请仔细阅读 Redis中文文档

  • 本地重载机制使用了SQLite3,详见 src/Builders/Traits/MessageTempMethod

webman-rqueue's People

Contributors

chaz6chez avatar fuzqing avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar

webman-rqueue's Issues

republish的优化

现状

  1. republish在极端情况下(ack成功后redis服务宕机)消息可能无法正常republish,从而消息存在丢失可能性

目标

  1. 保障数据

方案

  1. 借助其他方式储存该数据,如sqlite或者file
  2. 使用timer进行republish

支持php8.0+吗?

Your requirements could not be resolved to an installable set of packages.

Problem 1
- Root composer.json requires workbunny/webman-rqueue * -> satisfiable by workbunny/webman-rqueue[0.0.1, 0.0.2, 1.0.0, 1.0.1].
- workbunny/webman-rqueue[0.0.1, ..., 0.0.2, 1.0.0, ..., 1.0.1] require psr/container ^1.1.1 -> found psr/container[1.1.1, 1.1.2] but it conflicts with your root composer.json require (^2.0).

You can also try re-running composer require with an explicit version constraint, e.g. "composer require workbunny/webman-rqueue:*" to figure out if any version is installable, or "composer require workbunny/webman-rqueue:^2.1" if you know which you need.

Installation failed, reverting ./composer.json and ./composer.lock to their original content.

好像不支持php8.0 我用了psr/container(^2.0)

_header['_delete'] 消息释放如何理解?

在Builder里return true 代表这个某个任务消费完成,文档上描述 每一次消费之后会根据消息的属性 _header['_delete'] 来进行消息释放 应该如何理解呢?

增加对pending状态数据的处理

现状

  1. 极端情况下可能存在数据消费成功但未ack
  2. 未ack数据虽然不会被当前消费组消费但存在数据冗余

目标

  1. 将pending数据ack

方案

  1. 在消费者主定时器中增加xClaim逻辑,读取config中的消息最大等待时间
  2. 单独启动一个独立定时器,使用与消费者相同的消费组进行xClaim处理,读取config中的消息最大等待时间

延迟队列CPU占用高

启动两个消费QueueBuilder,一个延迟的一个非延迟的。在都没有进行消费的时候延迟QueueBuilder的CPU占用相比非延迟QueueBuilder差距很大,请问是因为延迟QueueBuilder内定时器的原因吗
WX20240201-172140

硬件配置
2vCPU 2.0GHz

释放消息bug

文档里说“QueueBuilder类使用定时器进行消费,每一次消费之后会根据消息的属性 _header['_delete'] 来进行消息释放”,
但是现在https://github.com/workbunny/webman-rqueue/blob/master/src/Builders/GroupBuilder.php#L60,消费时传入$del参数是false,

 $this->consume($worker, false);

https://github.com/workbunny/webman-rqueue/blob/master/src/Builders/Traits/MessageQueueMethod.php#L320这里的删除

 if($del) { $client->xDel($queueName, $ids); }

xDel永远不会执行。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.