Comments (6)
不必在意那么多!
实际上,NewLife.RocketMQ为topic的每个队列开一个线程,各个线程独立循环拉取消息,按照批大小,然后触发OnConsume事件。
所以,你可以认为,每个queue就是一个消费线程,独立触发事件,顺序消费。
至于控制台上看到的,的确有些不对,不过多年经验来看,也不影响什么。
from newlife.rocketmq.
谢谢作者回复,但是我想问的更深一点。因为是一些个人的理解和猜测,如有错误,请不吝指出
对于第一个问题:我通过实验发现newlife的consumer默认实现,是先获取需要处理多少个queue, 然后建立同等数量的线程,1对1去处理。 因此每个队列的处理顺序是一定顺序的,因此1000个100ms的任务,分为2个队列,consumer处理时用了2个线程工作(实际上用到了线程池,但是等同于2个线程工作)。因此理论上consumer每100ms能处理2个任务,按统计学那么1个任务平均50ms,这与实际结果53ms是差多不的。 这与java的MessageListenerOrderly的结果似乎相同,因此我猜测newlife的默认实现等同于MessageListenerOrderly
但是java版默认使用的是MessageListenerConcurrently,按照我的理解。同样上面的环境下,consumer被分配处理2个队列,但是consumer可能会用线程池生成10个线程去并行处理这2个队列,java的 这种情况下,当然每个队列的各个任务不保证先后完成顺序,因此各个队列的消费情况是乱序的。但是因此性能应该显著高于2个线程顺序处理
引用 https://copyfuture.com/blogs-details/202212292041048957
因此猜想java的默认实现使得在不关注顺序处理的情况下,消费能力可能是显著高于本包的默认实现的顺序消费的。感觉这有可能会成为一个痛点。
对于第2个问题,我似乎看到过资料minoffset会影响到rocketmq对已消费资源的空间回收。这个值一直为0,可能有什么原因吗。 因为这个现象,担心rocketmq会出现未知的错误
from newlife.rocketmq.
你的理解非常正确!
正式为了稳定有序消费,所以每个queue开了一个线程,顺序处理消息。
该线程每成功处理一批消息(return true),调度器就会把这批消息的最大偏移量设置回去rocketmq服务器,告诉服务器,这个消费者在这个queue的最后消费offset是哪里,然后客户端按照这个offset开始下一轮PULL拉取消息。
加入每个queue对应多个线程,拉取消费没有问题,但是设置offset的时候怎么办呢?rocketmq服务器只会有一个地方记录这个消费者在这个queue上的offset,多个线程同时设置,就可能混乱,导致消息被重复消费。
在每queue单线程的模式下,提升吞吐的办法是,加大批大小,然后批量处理这次拉取回来的消息,当所有消息都处理完成以后,再去设置offset,从而避免混乱问题。
最后,这个组件设计于2018年初,为了解决官方rocketmq的.net客户端不稳定问题(内部调用C++)。在中通,每天消费100亿消息,queue分了256,消费用了4~8台服务器。
from newlife.rocketmq.
谢谢,经过博主的提示。我有个新的思路,不知道对不对。
如果要实现MessageListenerConcurrently,最简单的方法就是 在OnConsumeAsync ,处理全部任务时,使用task, 及task.WhenAll 这样 消息的消费过程就全部是乱序,并且使用到线程池。
这种情况下,原先的设计就等同于使用queue.count的线程去批量拉取rocketmq消息。 然后线程池共同消费全部的消息。 最后的效果就是拉取和消费都是使用线程池,我猜测这种情况下等同于java的MessageListenerConcurrently效果。
进一步的话,可以继承Consumer重写消费过程,以实现可自定义的顺序或者共同消费
还有一个问题我想请教下, 请问如果在OnConsume里爆出异常,和主动返回false2种情况下,后续这批消息会怎么样呢,能否简单介绍。
衷心希望组件越来越好,感谢!
from newlife.rocketmq.
核心就是在异步的OnConsumeAsync事件里面 await Task.WhenAll。
重载 OnConsume 和写事件效果是一样的,因为默认的OnConsume实现就是触发事件。
返回false跟抛出异常,结果是一样的,都是跳过了UpdateOffset,然后下一次轮询再次拉取同样的消息。
在消费失败这个事情上,RocketMQ的标准方案是把消息放入另一个队列,静默一定时间后再次消费。
我们NewLife.RocketMQ没有实现这么复杂的机制,可以留给用户自己搞。
从架构设计上来讲,但凡能够控制返回false的时候,就不要轻易抛出异常。
from newlife.rocketmq.
理解了,谢谢!
from newlife.rocketmq.
Related Issues (20)
- 是否支持集群消费 HOT 9
- rocketmq origin msgid is diffent with sdk parse HOT 3
- 用线程创建了3个customer,程序开启一段时间后,就不消费了 HOT 10
- 消息消费不掉问题 HOT 2
- 消费端发布到服务器上报错 HOT 4
- 消费者会时不时的位移消费位置 HOT 4
- 用了最新版的NewLife.Core.dll还是内存泄露 HOT 6
- 用0401的版本,发布服务器上报错,如何解决 HOT 2
- no client info for this group: road-assistance HOT 2
- 支持顺序发布和消费消息
- 支持发布延迟消息
- 有没有顺序消息推送的例子? HOT 2
- 连接rocketmq dledger集群,集群重新选主后,rocketmq连接不上 HOT 2
- 请教一下,如何在出现异常时手动关闭程序,而不是自动重试 HOT 5
- 阿里云RocketMQ升级后无法订阅到部分消息 HOT 3
- .net core 程序连接华为云rocketmq启动失败 HOT 2
- 能不能设置任务超时时间 HOT 1
- 消费者服务重启后,同一个topic又开始重头消费了 HOT 4
- 使用阿里云地域的服务器地址 HOT 2
- 连接5.2.0版本的集群后,返回system_error错误 HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from newlife.rocketmq.