Comments (22)
是队列publish去重吗?
from webman-rqueue.
队列的publish依赖redis stream 的XADD,XADD如果添加两条相同的ID的消息时,假设消息还未被消费者读取,那么后一条消息会覆盖前一条消息;
sync_publish()方法支持传递headers,可以用于指定消息id,默认是由redis stream自动生成消息id;
// 发送一条id为1的消息
sync_publish(TestBuilder::instance(), 'test message', [
'_id' => 1
])
// 覆盖id为1的消息
sync_publish(TestBuilder::instance(), 'test message cover', [
'_id' => 1
])
from webman-rqueue.
队列的publish依赖redis stream 的XADD,XADD如果添加两条相同的ID的消息时,假设消息还未被消费者读取,那么后一条消息会覆盖前一条消息; sync_publish()方法支持传递headers,可以用于指定消息id,默认是由redis stream自动生成消息id;
// 发送一条id为1的消息 sync_publish(TestBuilder::instance(), 'test message', [ '_id' => 1 ]) // 覆盖id为1的消息 sync_publish(TestBuilder::instance(), 'test message cover', [ '_id' => 1 ])
了解了,就是要这个效果。感谢
from webman-rqueue.
SiteOpenBuilderDelayed::instance()->publish(7, [ '_delay' => 10, '_count' => 3, '_error' => '提交错误' ]);
可以触发
SiteOpenBuilderDelayed::instance()->publish(7, [ '_delay' => 10, '_count' => 3, '_error' => '提交错误', '_id' => 'open_7', ]);
触发不了队列
from webman-rqueue.
SiteOpenBuilderDelayed::instance()->publish(7, [ '_delay' => 10, '_count' => 3, '_error' => '提交错误' ]);
可以触发
SiteOpenBuilderDelayed::instance()->publish(7, [ '_delay' => 10, '_count' => 3, '_error' => '提交错误', '_id' => 'open_7', ]);
触发不了队列
请补充一下错误信息
另外,id建议使用redis约定的id格式为两个数字,中间以-分割,如1234567-1
from webman-rqueue.
1234567-1
我这边手动删除了redis中的数据可以提交了。但是发现了一个问题:第一次发布延迟队列,可以正常消费。消费完成后再次向同样的消费类发布延迟消息_id改成其他,发布后没有错误信息,但是不消费,查看redis 值是空的。如果把_id删除,再发布是可以消费成功的
from webman-rqueue.
redis版本:6.2.6
webman-rqueue版本:2.1.4
并且我第一次设置_id发布的时候,消费端打印$value的时候发现_header里的_id还是*,这个我不确定,想验证的时候就出现上面的问题了
from webman-rqueue.
from webman-rqueue.
1234567-1
我这边手动删除了redis中的数据可以提交了。但是发现了一个问题:第一次发布延迟队列,可以正常消费。消费完成后再次向同样的消费类发布延迟消息_id改成其他,发布后没有错误信息,但是不消费,查看redis 值是空的。如果把_id删除,再发布是可以消费成功的
每个Builder对应一个消费group,当消息被消费了后,消息在当前消费group的游标内就不存在了,所以修改id以后该消费group的消费者是无法再读取到该消息的,但是不影响其他消费group可以读取到该id的消息。
QueueBuilder的特性是每消费完一个消息就会将group游标移动并且移除该消息;
GroupBuilder的特性是没消费完一个消息仅移动group游标,移除消息交给定时器进行处理;
你是想实现什么功能吗?
from webman-rqueue.
1234567-1
我这边手动删除了redis中的数据可以提交了。但是发现了一个问题:第一次发布延迟队列,可以正常消费。消费完成后再次向同样的消费类发布延迟消息_id改成其他,发布后没有错误信息,但是不消费,查看redis 值是空的。如果把_id删除,再发布是可以消费成功的
每个Builder对应一个消费group,当消息被消费了后,消息在当前消费group的游标内就不存在了,所以修改id以后该消费group的消费者是无法再读取到该消息的,但是不影响其他消费group可以读取到该id的消息。
QueueBuilder的特性是每消费完一个消息就会将group游标移动并且移除该消息; GroupBuilder的特性是没消费完一个消息仅移动group游标,移除消息交给定时器进行处理;
感谢作者的耐心解答。我觉得我还是应该恶补一下redis stream相关的知识
我只想想实现,有一个任务。我需要再在未来不确定的一些时间去重复执行它。比如2月5号需要执行这个任务,3月8再去执行这个任务,xx月xx日再次执行等等。然后未来的某一个时间我发现3月8号的任务我不需要执行了,我需要去3月9号去执行这个任务,所以我要去修改3月8号的延迟执行时间。
from webman-rqueue.
当然我可以在3月9号发布一个新的任务,然后在消费逻辑里来判断任务的真实执行时间,只是这样的话不是最佳处理方式。
from webman-rqueue.
后者说我可以通过某种方式在发布延时任务的时候获得这个任务内部的ID,如果我想修改存在队列中的这个任务时,我直接去redis里删掉这个任务,然后再添加一条新的?
from webman-rqueue.
后者说我可以通过某种方式在发布延时任务的时候获得这个任务内部的ID,如果我想修改存在队列中的这个任务时,我直接去redis里删掉这个任务,然后再添加一条新的?
最简单的方式就是,你自己消息体内约定一个id,然后在执行的时候去判断一下这个id是否存在黑名单里,比如使用redis储存黑名单,当存在黑名单时业务逻辑忽略,消息自然会被队列消费者移除
from webman-rqueue.
后者说我可以通过某种方式在发布延时任务的时候获得这个任务内部的ID,如果我想修改存在队列中的这个任务时,我直接去redis里删掉这个任务,然后再添加一条新的?
最简单的方式就是,你自己消息体内约定一个id,然后在执行的时候去判断一下这个id是否存在黑名单里,比如使用redis储存黑名单,当存在黑名单时业务逻辑忽略,消息自然会被队列消费者移除
了解,这种方式就是类似于在消费逻辑里面增加判断对吧
from webman-rqueue.
后者说我可以通过某种方式在发布延时任务的时候获得这个任务内部的ID,如果我想修改存在队列中的这个任务时,我直接去redis里删掉这个任务,然后再添加一条新的?
最简单的方式就是,你自己消息体内约定一个id,然后在执行的时候去判断一下这个id是否存在黑名单里,比如使用redis储存黑名单,当存在黑名单时业务逻辑忽略,消息自然会被队列消费者移除
了解,这种方式就是类似于在消费逻辑里面增加判断对吧
是的,后续版本我会增加一个publishGetIds的方法,用于发布并获取消息ID组,这个版本今天就会更新;
不过依赖消息队列的ID并不是万全之策,通常来说不建议手动通过redis基础方法对队列进行操作,因为队列的运行过程依赖这一整套流程;
当然,如果你对redis stream比较熟悉了,我建议可以根据你的需求参考自定义Builder那一部分自定义一个Builder,这样可能更靠谱一些,因为QueueBuilder和GroupBuilder都已经有属于他们自己的消费逻辑套路了。
from webman-rqueue.
好的感谢。
我这边新增了一个测试的延迟任务测试后发现。_id的格式有要求,如果是用80614515588308743-118类似这种格式的id就可以被正常消费的。但是我第二次想覆盖之前的任务时发现不成功。
具体表现
首先发布一个延迟60秒的任务并设置_id
发布一个新的延迟任务,此时任务_id是一个新的id
修改第一次发布的任务,把延迟时间改成10秒,此时使用第一设置的id
from webman-rqueue.
好的感谢。 我这边新增了一个测试的延迟任务测试后发现。_id的格式有要求,如果是用80614515588308743-118类似这种格式的id就可以被正常消费的。但是我第二次想覆盖之前的任务时发现不成功。 具体表现 首先发布一个延迟60秒的任务并设置_id
发布一个新的延迟任务,此时任务_id是一个新的id
修改第一次发布的任务,把延迟时间改成10秒,此时使用第一设置的id
消息格式是{整数}-{整数}的要求格式;
不可以用消费者去验证消息是否被覆盖,可以通过redis的管理工具去查看对应消息ID是否被覆盖;
消费者是基于GROUP的消费模式,消息一旦被消费,该GROUP的消息游标就会移动,该GROUP将再也无法读取已读取过的消息;
from webman-rqueue.
第二个任务的消费时间02:34:43和发布时间02:34:23正好差20秒是没问题的
但是第一次提交的时间是02:34:09 后面修改它的时间是02:34:46 消费的时间是02:35:09。这个时间是第一次提交的60秒之后,而不是修改的10秒后。
并且消费逻辑里面获取的_Id是"*"而不是我发布时设置的id
from webman-rqueue.
好的感谢。 我这边新增了一个测试的延迟任务测试后发现。_id的格式有要求,如果是用80614515588308743-118类似这种格式的id就可以被正常消费的。但是我第二次想覆盖之前的任务时发现不成功。 具体表现 首先发布一个延迟60秒的任务并设置_id
发布一个新的延迟任务,此时任务_id是一个新的id
修改第一次发布的任务,把延迟时间改成10秒,此时使用第一设置的id
执行结果消息格式是{整数}-{整数}的要求格式; 不可以用消费者去验证消息是否被覆盖,可以通过redis的管理工具去查看对应消息ID是否被覆盖; 消费者是基于GROUP的消费模式,消息一旦被消费,该GROUP的消息游标就会移动,该GROUP将再也无法读取已读取过的消息;
难道覆盖的作用不是覆盖已经发布的任务吗,不然这个覆盖的意义在哪呢
from webman-rqueue.
我看之前说的覆盖前提是“假设消息还未被消费者读取”我想问下发布延迟任务后,消费者什么以后会读取这个任务呢。中间有我这边可以干预的步骤吗
from webman-rqueue.
我看之前说的覆盖前提是“假设消息还未被消费者读取”我想问下发布延迟任务后,消费者什么以后会读取这个任务呢。中间有我这边可以干预的步骤吗
redis stream是流式队列,队列在严格意义上没有延迟消息的说法,是通过requeue模拟出来的,也就是说delay消息实际上是在Builder中进行了无数次读取放回,直到delay条件达到,再触发消费回调函数;
stream的覆盖消息主要用于多GROUP消费相同队列时的对后者消费GROUP的消息修正,并不是为了延迟队列或者延迟行为场景而诞生的;
消费者的Builder的消费逻辑无法干预,需要你在handler中进行逻辑干预,或者自行实现自定义Builder实现符合你场景的Builder消费器
from webman-rqueue.
我看之前说的覆盖前提是“假设消息还未被消费者读取”我想问下发布延迟任务后,消费者什么以后会读取这个任务呢。中间有我这边可以干预的步骤吗
redis stream是流式队列,队列在严格意义上没有延迟消息的说法,是通过requeue模拟出来的,也就是说delay消息实际上是在Builder中进行了无数次读取放回,直到delay条件达到,再触发消费回调函数;
stream的覆盖消息主要用于多GROUP消费相同队列时的对后者消费GROUP的消息修正,并不是为了延迟队列或者延迟行为场景而诞生的;
消费者的Builder的消费逻辑无法干预,需要你在handler中进行逻辑干预,或者自行实现自定义Builder实现符合你场景的Builder消费器
感谢作者,我在学习一下redis stream
from webman-rqueue.
Related Issues (11)
- 可用于生产环境吗? HOT 1
- 支持php8.0+吗? HOT 1
- 为rqueue增加psr/log及日志收集部分
- 完善webman-rqueue的消费者测试
- 增加对pending状态数据的处理
- republish的优化 HOT 1
- 释放消息bug
- 延迟队列CPU占用高 HOT 7
- 非延迟队列设置timerInterval 显示busy HOT 6
- _header['_delete'] 消息释放如何理解? HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from webman-rqueue.