Code Monkey home page Code Monkey logo

Comments (6)

nnhy avatar nnhy commented on July 18, 2024

不必在意那么多!
实际上,NewLife.RocketMQ为topic的每个队列开一个线程,各个线程独立循环拉取消息,按照批大小,然后触发OnConsume事件。
所以,你可以认为,每个queue就是一个消费线程,独立触发事件,顺序消费。
至于控制台上看到的,的确有些不对,不过多年经验来看,也不影响什么。

from newlife.rocketmq.

yangbocheng avatar yangbocheng commented on July 18, 2024

谢谢作者回复,但是我想问的更深一点。因为是一些个人的理解和猜测,如有错误,请不吝指出
对于第一个问题:我通过实验发现newlife的consumer默认实现,是先获取需要处理多少个queue, 然后建立同等数量的线程,1对1去处理。 因此每个队列的处理顺序是一定顺序的,因此1000个100ms的任务,分为2个队列,consumer处理时用了2个线程工作(实际上用到了线程池,但是等同于2个线程工作)。因此理论上consumer每100ms能处理2个任务,按统计学那么1个任务平均50ms,这与实际结果53ms是差多不的。 这与java的MessageListenerOrderly的结果似乎相同,因此我猜测newlife的默认实现等同于MessageListenerOrderly

但是java版默认使用的是MessageListenerConcurrently,按照我的理解。同样上面的环境下,consumer被分配处理2个队列,但是consumer可能会用线程池生成10个线程去并行处理这2个队列,java的 这种情况下,当然每个队列的各个任务不保证先后完成顺序,因此各个队列的消费情况是乱序的。但是因此性能应该显著高于2个线程顺序处理
引用 https://copyfuture.com/blogs-details/202212292041048957

因此猜想java的默认实现使得在不关注顺序处理的情况下,消费能力可能是显著高于本包的默认实现的顺序消费的。感觉这有可能会成为一个痛点。

对于第2个问题,我似乎看到过资料minoffset会影响到rocketmq对已消费资源的空间回收。这个值一直为0,可能有什么原因吗。 因为这个现象,担心rocketmq会出现未知的错误

from newlife.rocketmq.

nnhy avatar nnhy commented on July 18, 2024

你的理解非常正确!
正式为了稳定有序消费,所以每个queue开了一个线程,顺序处理消息。
该线程每成功处理一批消息(return true),调度器就会把这批消息的最大偏移量设置回去rocketmq服务器,告诉服务器,这个消费者在这个queue的最后消费offset是哪里,然后客户端按照这个offset开始下一轮PULL拉取消息。
加入每个queue对应多个线程,拉取消费没有问题,但是设置offset的时候怎么办呢?rocketmq服务器只会有一个地方记录这个消费者在这个queue上的offset,多个线程同时设置,就可能混乱,导致消息被重复消费。

在每queue单线程的模式下,提升吞吐的办法是,加大批大小,然后批量处理这次拉取回来的消息,当所有消息都处理完成以后,再去设置offset,从而避免混乱问题。

最后,这个组件设计于2018年初,为了解决官方rocketmq的.net客户端不稳定问题(内部调用C++)。在中通,每天消费100亿消息,queue分了256,消费用了4~8台服务器。

from newlife.rocketmq.

yangbocheng avatar yangbocheng commented on July 18, 2024

谢谢,经过博主的提示。我有个新的思路,不知道对不对。
如果要实现MessageListenerConcurrently,最简单的方法就是 在OnConsumeAsync ,处理全部任务时,使用task, 及task.WhenAll 这样 消息的消费过程就全部是乱序,并且使用到线程池。
这种情况下,原先的设计就等同于使用queue.count的线程去批量拉取rocketmq消息。 然后线程池共同消费全部的消息。 最后的效果就是拉取和消费都是使用线程池,我猜测这种情况下等同于java的MessageListenerConcurrently效果。
进一步的话,可以继承Consumer重写消费过程,以实现可自定义的顺序或者共同消费

还有一个问题我想请教下, 请问如果在OnConsume里爆出异常,和主动返回false2种情况下,后续这批消息会怎么样呢,能否简单介绍。

衷心希望组件越来越好,感谢!

from newlife.rocketmq.

nnhy avatar nnhy commented on July 18, 2024

核心就是在异步的OnConsumeAsync事件里面 await Task.WhenAll。

重载 OnConsume 和写事件效果是一样的,因为默认的OnConsume实现就是触发事件。

返回false跟抛出异常,结果是一样的,都是跳过了UpdateOffset,然后下一次轮询再次拉取同样的消息。

在消费失败这个事情上,RocketMQ的标准方案是把消息放入另一个队列,静默一定时间后再次消费。
我们NewLife.RocketMQ没有实现这么复杂的机制,可以留给用户自己搞。
从架构设计上来讲,但凡能够控制返回false的时候,就不要轻易抛出异常。

from newlife.rocketmq.

yangbocheng avatar yangbocheng commented on July 18, 2024

理解了,谢谢!

from newlife.rocketmq.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.