Code Monkey home page Code Monkey logo

newlife.rocketmq's Issues

在 RocketMq-console-ng 页面无法查询到消息设置的 Keys 与 Tags

在本项目 Message 类的 GetProperties 实现中有以下两行代码

 if (!Tags.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", nameof(Tags), Tags);
 if (!Keys.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", nameof(Keys), Keys);

我参考了 Java 客户端中的实现,Tags 和 Keys 应当为 TAGS KEYS 并区分大小写
我尝试修改上述代码为

 if (!Tags.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", "TAGS", Tags);
 if (!Keys.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", "KEYS", Keys);

测试后发现,这样在 RocketMq-console-ng 页面就能查询到设置的 Keys 和 Tags 了,并且在 Java 客户端也可以正常使用了。

.NET5 Could not load file or assembly 'NewLife.Core, Version=9.0.8098.19786

.NET5 环境,已引用
NewLife.Core 9.0.2022.304
NewLife.RocketMQ 2.0.2022.304

启动包找不到引用
System.IO.FileLoadException:“Could not load file or assembly 'NewLife.Core, Version=9.0.8098.19786, Culture=neutral, PublicKeyToken=null'. The located assembly's manifest definition does not match the assembly reference. (0x80131040)”

消费者的消费问题

我在使用的时候发现了2个问题,能否指教
1.经过测试,消费端消费的能力似乎等同于java版的有序消费模式(MessageListenerOrderly), 见
https://www.jb51.net/article/215566.htm
我的环境是1个topic 2个队列,共1000条消息。 1个消费者,BatchSize默认32,使用Thread.Sleep(100) 模拟耗时,最后全部消费完,耗时53s,平均每个包耗时53ms, 与上面网址java版的MessageListenerOrderly速度相当。 而非java默认情况下的并发消费模式MessageListenerConcurrently

2.消费完了,返回的也是true。但是为什么从web-console上看到的 [主题]-nx_test-状态 的2个队列的最小位点始终为0。 [主题]-nx_test-状态-consumer管理里面的代理者位点和消费者位点是相同的,差值为0; 而消息的traceType全部是PULL,

下面分别是代码,和图:图中已经生产了3000条组左右的消息

var mq111 = new Producer()
{
    NameServerAddress = "127.0.0.1:9876",
    Log = XTrace.Log,
};
//链接默认的topic
mq111.Start();
mq111.CreateTopic("nx_test", 2);
mq111.Stop();
mq111.Dispose();
var mq = new Producer
{
    Topic = "nx_test",
    NameServerAddress = "127.0.0.1:9876",
    Log = XTrace.Log,
};
mq.Start();
for (var i = 0; i < 1001; i++)
{
    var str = "学新" + i +$" publishtime = {DateTime.Now:HHmmss.fff}";
    var sr = mq.Publish(str, "TagA");
}
Console.WriteLine("正在处理.....");
Console.ReadLine();
var consumer = new Consumer
{
    Topic = "nx_test",
    Group = "nx_g1",
    NameServerAddress = "127.0.0.1:9876",
    //FromLastOffset = false,
    //BatchSize = 3,
    Log = XTrace.Log,
};

consumer.OnConsume = (q, ms) =>
{
    XTrace.WriteLine("[{0}@{1}]收到消息[{2}] threadId={3} ms.Length={4}", q.BrokerName, q.QueueId, ms.Length, Thread.CurrentThread.ManagedThreadId,ms.Length);
    Console.WriteLine(
        $"[{q.BrokerName}@{q.QueueId}]收到消息[{ms.Length}] threadId={Thread.CurrentThread.ManagedThreadId} ms.Length={ms.Length}");
    foreach (var item in ms.ToList())
    {
        XTrace.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】queueid={q.QueueId} threadid={Thread.CurrentThread.ManagedThreadId}");
        Console.WriteLine(
            $"消息:主键【{ item.Keys}】,产生时间【{ item.BornTimestamp.ToDateTime()}】,内容【{ item.Body.ToStr()}");
        Thread.Sleep(100);
    }
    return true;
};
consumer.Start();
Console.WriteLine("正在处理.....按任意键停止消费");
Console.ReadLine();
consumer.Stop();
consumer.Dispose();
Console.WriteLine("consumer");

image
image
image

Void DoWork(System.Object) 耗时过长 59,800ms

一直跟踪不到,这个错误
System.AggregateException: One or more errors occurred. (A task was canceled.)
....Void DoWork(System.Object) 耗时过长 59,800ms
是哪个操作产生的?
感觉是创建生产者(Producer)的时候触发的。。

10:10:00.648 30 N Worker #14 发现Broker[broker-clz]: 192.168.10.201:10911
10:10:00.648 30 N Worker #14 SELECT
p.*
FROM ProductTemp AS p
inner join Product jzjpp on jzjpp.Id = p.Pid and jzjpp.Status = 0
order by p.Pid
10:10:09.555 41 N T System.AggregateException: One or more errors occurred. (A task was canceled.)
---> System.Threading.Tasks.TaskCanceledException: A task was canceled.
--- End of inner exception stack trace ---
at System.Threading.Tasks.Task`1.GetResultCore(Boolean waitCompletionNotification)
at NewLife.RocketMQ.ClusterClient.Invoke(RequestCode request, Object body, Object extFields, Boolean ignoreError) in \NewLife.RocketMQ\ClusterClient.cs:line 211
at NewLife.RocketMQ.NameClient.GetRouteInfo(String topic) in \NewLife.RocketMQ\NameClient.cs:line 66
at NewLife.RocketMQ.NameClient.DoWork(Object state) in \NewLife.RocketMQ\NameClient.cs:line 58
at NewLife.Threading.TimerScheduler.Execute(Object state) in \NewLife.Core\Threading\TimerScheduler.cs:line 273
10:10:09.555 41 N T 任务 [7043]Void DoWork(System.Object) 耗时过长 59,800ms,建议使用异步任务Async=true
10:10:11.256 9 Y - [broker-clz]<= #PULL_NOT_FOUND(53799)[0]
10:10:11.256 9 Y - [broker-clz]<= #PULL_NOT_FOUND(53800)[0]
10:10:11.256 9 Y - [broker-clz]<= #PULL_NOT_FOUND(53802)[0]
10:10:11.256 52 N CT3 [broker-clz]=> PULL_MESSAGE(53803)[0]
10:10:11.256 9 Y - [broker-clz]<= #PULL_NOT_FOUND(53801)[0]
10:10:11.256 49 N CT0 [broker-clz]=> PULL_MESSAGE(53804)[0]
10:10:11.256 51 N CT2 [broker-clz]=> PULL_MESSAGE(53805)[0]
10:10:11.256 50 N CT1 [broker-clz]=> PULL_MESSAGE(53806)[0]

请问下 Header 里面为啥要截断异常信息,这样设计的目的是啥哦?

/// <summary>创建异常</summary>
/// <returns></returns>
public ResponseException CreateException()
        {
            var err = Remark;
            if (!err.IsNullOrEmpty())
            {
                var p = err.IndexOf("Exception: ");
                if (p >= 0) err = err.Substring(p + "Exception: ".Length);
                p = err.IndexOf(", ");
                if (p > 0) err = err.Substring(0, p);
            }

            return new ResponseException(Code, err);
        }

调用出错,似乎版本不兼容 rocketmq v4.9.1

MQ版本 V4_9_1

Producer发送消息会报如下错误:

the custom field <c> is null
org.apache.rocketmq.remoting.exception.RemotingCommandException: the custom field <c> is null, 
org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor.checkNotNull(AbstractSendMessageProcessor.java:376)

字段 c 对应的是 defaultTopic

消费者 consume 支持异步

看了下源码,消费者是 pull 模型。 拉取消息都做了 await,Consume 就改下异步嘛? 能不能安排上?

var pr = await Pull(mq, offset, BatchSize, SuspendTimeout);
                    if (pr != null)
                    {
                        switch (pr.Status)
                        {
                            case PullStatus.Found:
                                if (pr.Messages != null && pr.Messages.Length > 0)
                                {
                                    // 触发消费
                                    **var rs = Consume(mq, pr);**  这边能改成异步么? await ConsumeAsync

                                    // 更新偏移
                                    if (rs)
                                    {
                                        st.Offset = pr.NextBeginOffset;
                                        // 提交消费进度
                                        await UpdateOffset(mq, st.Offset);
                                    }
                                }

用线程创建了3个customer,程序开启一段时间后,就不消费了

for (int i = 0; i < 3; i++)
{
var j = i;
Task.Factory.StartNew(() =>
{
//测试消费消息
var consumer = new NewLife.RocketMQ.Consumer
{
Topic = "bb",
NameServerAddress = "xx",
BatchSize = 3,
Group = "bbgroup"
};
consumer.Start();
consumer.OnConsume = (q, ms) =>
{
string mInfo = $"datetime={DateTime.Now},BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}";
Console.WriteLine(mInfo);
if (q.QueueId % 3 == j)
{
//string mInfo = $"BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}";
//Console.WriteLine(mInfo);
LogHelper.Information("RocketMQConsumer" + j, mInfo);
foreach (var item in ms.ToList())
{
string msg = $"消息:msgId={item.MsgId},key={item.Keys},产生时间【{item.BornTimestamp.ToDateTime()}】,内容>{item.BodyString}";
//Console.WriteLine(msg);
LogHelper.Information("RocketMQConsumer" + j, msg);
//chat.SendMessageMQ(rtMQHelper.getSingnalModel(item.Body.ToString()));
}
// return false;//通知消息队:不消费消息
return true; //通知消息队:消费了消息
}
else
{
return true;
}
};
});
}
请问是不是这种写法有问题?

Producer.cs 的 SelectQueue 方法里面获取的 Brokers 属性非线程安全

现象

在大概 30 秒左右的高速调用 Producer.cs 的 Publish 方法发送消息,此时偶尔将会出现 System.ArgumentOutOfRangeException:“Index was out of range. Must be non-negative and less than the size of the collection. ”System.InvalidOperationException:“Collection was modified; enumeration operation may not execute.” 提示,导致消息发送失败

原因

在 Producer.cs 的 SelectQueue方法里面,将会调用 Brokers.Where 方法获取 BrokerInfo 代理

var list = Brokers.Where(e => e.Permission.HasFlag(Permissions.Write) && e.WriteQueueNums > 0).ToList();

而在 Producer.cs 的 Brokers 属性本质是调用 NameClient.cs 的 Brokers 属性

但是在初始化时,将会在 TimerX 初始化逻辑,代码如下

        public override void Start()
        {
            var cfg = Config;
            var ss = cfg.NameServerAddress.Split(";");

            Servers = ss.Select(e => new NetUri(e)).ToArray();

            base.Start();

            if (_timer == null) _timer = new TimerX(DoWork, null, cfg.PollNameServerInterval, cfg.PollNameServerInterval);
        }

更新时间差不多就是 30 秒左右,但是在更新完成之后,调用的 GetRouteInfo 方法里面,将变更集合,同时没有加上任何锁,导致了集合被更改

public IList<BrokerInfo> GetRouteInfo(String topic)
{
   // 忽略代码
   Brokers.Clear();
   if (Brokers is List<BrokerInfo> bks) bks.AddRange(list);
}

如果在刚好调用 GetRouteInfo 方法时,在进行消息发送,那么此时将会因为 Brokers 集合的变更而导致发送失败

Consumer的FromLastOffset问题

请问Consumer.FromLastOffset的设定是不是对应java原版里的ConsumeFromWhere,看名字应该是这个意思,但是看源码实现没有达到这个效果,并且没有ConsumeFromWhere的相关实现。

// 查询偏移量,可能首次启动-1
if (st.Offset < 0 && FromLastOffset)
{
var p = QueryOffset(mq);
if (SkipOverStoredMsgCount > 0)
{
// 设置了跳过积压的消息,此时判断积压的消息条数,若消息条数大于设定的数量,则强制从消费最大偏移量的位置消费
var maxOffset = QueryMaxOffset(mq);
if (maxOffset >= p + SkipOverStoredMsgCount) p = maxOffset;
}
//if (p == -1) p = 0;
//第一次消费新的队列,强制从消费最大偏移量的位置消费(避免由于第一次从最小偏移量消费而导致的数据大量积压问题)
//if (p <= 0) p = QueryMaxOffset(mq);
st.Offset = st.CommitOffset = p;
if (st.Offset >= 0) WriteLog("开始消费[{0}@{1}] Offset={2:n0}", mq.BrokerName, mq.QueueId, st.Offset);
}
// 拉取一批,阻塞等待
var offset = st.Offset >= 0 ? st.Offset : 0;

上面这段代码截取自DoPull方法,首先明确一点,在DoPull首次被调用时QueueStore.Offset取到的值必定是-1。然后根据上面这部分代码:
FromLastOffset=false

首次运行offset取值必定为0。这样应用每次重启就必须重新消费,但ConsumeFromWhere是针对首次消费的配置,也就是group创建后的首次消费,重启应用后不再判断该配置,应该在之前的offset继续消费

FromLastOffset=true & SkipOverStoredMsgCount<=0

首次运行时offset从broker取当前消费offset。这个对之前有过消费记录的没有问题,但对首次消费来说QueryOffset查询到的一定是0,这样也就需要重头开始消费,这和FromLastOffset=true不符,理论应该FromLastOffset=true时首次消费从最大偏移量开始消费,反之从0开始

FromLastOffset=true & SkipOverStoredMsgCount>0

首次运行时将offset + SkipOverStoredMsgCountmaxOffset进行对比,这样存在两种情况
1.首次消费,如果最大偏移量大于最大积压数量,那么就从最大偏移量开始消费,反之从0开始消费
2.非首次消费,如果最大偏移量大于当前偏移量+最大积压数量,那么就从最大偏移量开始消费,反之从当前偏移量开始消费
第一种情况有可能从0开始消费,与FromLastOffset=true定义不符,第二种情况按照你SkipOverStoredMsgCount的设定是没问题的

按照“FromLastOffset=true时首次消费从最大偏移量开始消费,反之从0开始消费,非首次消费从上一次消费的偏移量继续消费”的逻辑,我自己重新改了一下代码,完成后准备提交一个pullrequest,现在有一个问题需要跟作者讨论一下,SkipOverStoredMsgCount这个配置应该是你自己定义的吧,官方应该没有这个配置,我认为在已经消费过的情况下,即使有积压也不应该通过一个配置进行自动跳过,应该由人工判断后手动跳过,自动的风险太大,所以我更改后的逻辑是只在首次消费且FromLastOffset=false时才判断积压情况,这样就是在首次消费时,如果期望从头开始收,但发现从头开始数据量过大,那么就从尾开始收,同时因为这个配置是自定义的,我认为默认应该保持与官方一致,避免使用的人在不知情的情况下做出了与官方不一样的操作,所以我认为将SkipOverStoredMsgCount的默认值设置为0更合适。作者你看看SkipOverStoredMsgCount按我上面说的这个逻辑改是否可行,如果可行我就准备提交pullrequest了,如果有不妥的地方你说说你的想法,确定后我再提交pullrequest

双主集群

请问支持双主集群吗?双主集群只配置一个consumer可以吗?测试发现起两个消费者,分表连接双主的某一个节点,是消费重复的,如何提升单个消费者的消费能力?谢谢

是否支持集群消费

请问是否支持多节点消费,我们测试的时候发现在3个服务器上消费同一个topic数据,也是同一个消费组,发现三个节点都会消费同一条数据

Producer和consumer如何使用阿里云rocketMQ

刚接触rocketMQ不太懂这个配置,看到这个类库,但是没有使用方面的文档说明

            NewLife.RocketMQ.Producer producer = new NewLife.RocketMQ.Producer
            {
                AccessKey = AccessKeyId,
                SecretKey = AccessKeySecret,
                Server = EndPoint,
                //此处为阿里云控制台rocketMQ实例下的接入点,分为TCP(只有内网Endpoint),HTTP(公网和内网为不同Endpoint)
                //是否需要NameServerAddress配置
                Topic = Topic
            };
            producer.Start();
            producer.Publish(new Message
            {
                Keys = "TheKey",
                BodyString = "TheMessage." + DateTime.Now,
                Tags = "TestTag"
            });
            producer.Stop();
NewLife.RocketMQ.Consumer consumer = new NewLife.RocketMQ.Consumer();
            consumer.Configure(new NewLife.RocketMQ.MqSetting()
            {
                AccessKey = AccessKeyId,
                SecretKey = AccessKeySecret,
                Group = Group,
                Server = EndPoint,
                Topic = Topic
            });
            consumer.OnConsume = (mq, ext) =>
            {
                Console.WriteLine(mq.BrokerName);
                return true;
            };
            consumer.Start();
            Task.Delay(TimeSpan.FromMinutes(10));
            consumer.Stop();

consumer.Start();出现问题

微信截图_20211209175813

System.AggregateException
HResult=0x80131500
Message=One or more errors occurred. (A task was canceled.)
Source=System.Private.CoreLib
StackTrace:
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
at System.Threading.Tasks.Task1.GetResultCore(Boolean waitCompletionNotification) at System.Threading.Tasks.Task1.get_Result()
at NewLife.RocketMQ.ClusterClient.Invoke(RequestCode request, Object body, Object extFields, Boolean ignoreError)
at NewLife.RocketMQ.NameClient.GetRouteInfo(String topic)
at NewLife.RocketMQ.Client.MqBase.Start()
at NewLife.RocketMQ.Consumer.Start()
at MQConsumer.Program.Main(String[] args) in C:\Users\lenovo\Downloads\RocketMQTest-main\MQConsumer\Program.cs:line 39

内部异常 1:
TaskCanceledException: A task was canceled.

@hipeace86 @annuo1111 你好,麻烦请问一下为什么会出现这种问题,十分感谢。

修改和扩展了Producer.cs中的Publish方法

\NewLife.RocketMQ\NewLife.RocketMQ\Producer.cs

因为要用到keys所以,建议还在多加个Publish方法好点:

        /// <summary>发布消息</summary>
        /// <param name="body"></param>
        /// <param name="tags"></param>
        /// <param name="timeout"></param>
        /// <returns></returns>
        public virtual SendResult Publish(Object body, String tags = null, Int32 timeout = -1) => Publish(body, tags, null, timeout);

        /// <summary>发布消息</summary>
        /// <param name="body"></param>
        /// <param name="tags">传null则为空</param>
	/// <param name="keys">传null则为空</param>
        /// <param name="timeout"></param>
        /// <returns></returns>
        public virtual SendResult Publish(Object body, String tags, String keys, Int32 timeout = -1)
        {
            if (!(body is Byte[] buf))
            {
                if (!(body is String str)) str = body.ToJson();

                buf = str.GetBytes();
            }

            return Publish(new Message { Body = buf, Tags = tags, Keys = keys }, timeout);
        }

不支持RocketMQ4.6.0版本中增加的request-reply新特性

RocketMQ4.6.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返回响应消息,类似rpc调用效果。
咱们这个中间件不支持这个新特性,期待支持新特性的版本上线!

调用阿里云(tcp) Producer和 Consumer 都报topic route info in name server for the topic: guanzhu

14:39:23.710 1 N - [Name]集群地址:Http://****aliyuncs.com:80
14:39:23.720 1 N - [Name]正在连接[Http://****aliyuncs.com:80]
14:39:23.733 1 N - Tcp.Open Tcp://0.0.0.0:51925=>118.190.213.147:80
14:39:23.954 1 N - NewLife.Core v8.10.2021.0101 Build 2021-01-01 19:31:46 .NET Framework 4.6
14:39:23.955 1 N - X组件核心库 ?2002-2020 NewLife
14:39:23.956 1 N - MqProducer v1.0.0.0 Build 2000-01-01 .NET Framework 4.7.2
14:39:23.957 1 N - MqProducer Copyright ? 2021
14:39:23.983 1 N - NewLife.RocketMQ.Protocol.ResponseException: 17: ### No topic route info in name server for the topic: guanzhu
See http://rocketmq.apache.org/docs/faq/ for further details.
在 NewLife.RocketMQ.ClusterClient.Invoke(RequestCode request, Object body, Object extFields, Boolean ignoreError) 位 置 D:\X\NewLife.RocketMQ\NewLife.RocketMQ\ClusterClient.cs:行号 216
在 NewLife.RocketMQ.NameClient.GetRouteInfo(String topic) 位置 D:\X\NewLife.RocketMQ\NewLife.RocketMQ\NameClient.cs: 行号 66
在 NewLife.RocketMQ.Client.MqBase.Start() 位置 D:\X\NewLife.RocketMQ\NewLife.RocketMQ\MqBase.cs:行号 154
在 NewLife.RocketMQ.Producer.Start() 位置 D:\X\NewLife.RocketMQ\NewLife.RocketMQ\Producer.cs:行号 36
在 MqProducer.Program.Main(String[] args) 位置 D:\工作\MQ\Demo\rocketMQ\rocketMQDemo\MqProducer\Program.cs:行号 34
14:39:23.984 1 N - 异常退出!

Is there size limit for a message?

I have some trouble to send large size of message. When i send large size message from Sender Program written in java, NewLife.RocketMQ client can receive it. but message body broken. (small size message is OK)
How long message size can be support? Is there size limit for a message?

Additional Test:

  1. Combination Sender written in NewLife.RocketMQ and Listener written in NewLife.RocketMQ : SUCCESS
  2. Combination Sender written in Java and Listener written in Java : SUCCESS
  3. Combination Sender written in NewLife.RocketMQ and Listener written in Java : SUCCESS
  4. Combination Sender written in Java and Listener written in NewLife.RocketMq : FAIL

消费端无法拉取未被消费的信息

var consumer = new Consumer
{
//Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet",
//AccessKey = "LTAINsp1qKfO61c5",
//SecretKey = "BvX6DpQffUz8xKIQ0u13EMxBW6YJmp",
Topic = "ntest",
Group = "test",
NameServerAddress = "127.0.0.1:9876",
FromLastOffset = false,
BatchSize = 20,
Log = XTrace.Log,
};

在上面的示例代码中,

FromLastOffset 字段设置为true的情况下,消费端仅拉取程序启动后新投递到消息队列中的信息,

FromLastOffset 字段设置为false的情况下,消费端每次启动都将拉取消息队列中所有的信息,

可否新增一个配置项,让消费端启动时仅拉取消息队列中所有未被消费的消息。

consumer.Start();报错

System.AggregateException
HResult=0x80131500
Message=One or more errors occurred. (A task was canceled.)
Source=System.Private.CoreLib
StackTrace:
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
at System.Threading.Tasks.Task1.GetResultCore(Boolean waitCompletionNotification) at System.Threading.Tasks.Task1.get_Result()
at NewLife.RocketMQ.ClusterClient.Invoke(RequestCode request, Object body, Object extFields, Boolean ignoreError)
at NewLife.RocketMQ.NameClient.GetRouteInfo(String topic)
at NewLife.RocketMQ.Client.MqBase.Start()
at NewLife.RocketMQ.Consumer.Start()
at MQConsumer.Program.Main(String[] args) in C:\Users\lenovo\Downloads\RocketMQTest-main\MQConsumer\Program.cs:line 39

内部异常 1:
TaskCanceledException: A task was canceled.

@hipeace86 @annuo1111 你好,麻烦请问一下会什么会出现这种问题,十分感谢。

支持 RocketMQ 5.0 POP 消费模式

RocketMQ 5.0 中引入了一种新的消费模式:Pop 消费模式。

我们知道 RocketMQ 原来有两种消费模式:Pull 模式消费和 Push 模式消费,其中 Push 模式指的是 Broker 将消息主动“推送”给消费者,它的背后其实是消费者在不断地 Pull 消息来实现类似于 Broker “推”消息给消费者的效果。

新引入的 Pop 消费模式主要是用于 Push 消费时将拉消息的动作替换成 Pop 。Pop 消费的行为和 Pull 消费很像,区别在于 Pop 消费的重平衡是在 Broker 端做的,而之前的 Pull 和 Push 消费都是由客户端完成重平衡。

优点:

  • 负载均衡放在服务端完成,可支持超过 Queue 数量的消费者
  • 解决某个消费者 hang (挂机、僵尸)导致对应的 Queue 队列任务堆积没有被消费

参考:

MQ 解包失败

特别感谢大佬的这个库

在运行的时候发现有解包失败,但是缺少PDB符号文件,不知道具体是哪里炸了

调用堆栈

>	NewLife.Core.dll!System.IOHelper.ToUInt32(byte[] data, int offset, bool isLittleEndian)	C#
 	NewLife.Core.dll!NewLife.Net.Handlers.MessageCodec<System.__Canon>.GetLength(NewLife.Data.Packet pk, int offset, int size)	C#
 	NewLife.Core.dll!NewLife.Messaging.PacketCodec.Parse(NewLife.Data.Packet pk)	C#
 	NewLife.RocketMQ.dll!NewLife.RocketMQ.Protocol.MqCodec.Decode(NewLife.Model.IHandlerContext context, NewLife.Data.Packet pk)	C#
 	NewLife.Core.dll!NewLife.Net.Handlers.MessageCodec<NewLife.RocketMQ.Protocol.Command>.Read(NewLife.Model.IHandlerContext context, object message)	C#
 	NewLife.Core.dll!NewLife.Net.SessionBase.ProcessReceive(NewLife.Data.Packet pk, System.Net.IPEndPoint remote)	C#
 	NewLife.Core.dll!NewLife.Net.SessionBase.ProcessEvent(System.Net.Sockets.SocketAsyncEventArgs se)	C#
 	System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state)	C#
 	System.Private.CoreLib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch()	C#
 	[本机到托管的转换]	

信息:

名称 类型
$exception {System.IndexOutOfRangeException: Index was outside the bounds of the array. at System.IOHelper.ToUInt32(Byte[] data, Int32 offset, Boolean isLittleEndian)} System.IndexOutOfRangeException

上面代码的 Offset 被优化掉了,现在能找到的是包的信息

按照 MqCodec 的 Decode 的代码,可以猜这是 0 的值

        protected override IList<Command> Decode(IHandlerContext context, Packet pk)
        {
            var ss = context.Owner as IExtend;
            var pc = ss["Codec"] as PacketCodec;
            if (pc == null) ss["Codec"] = pc = new PacketCodec { GetLength = p => GetLength(p, 0, -4) };
            // 忽略代码
        }

包的模拟数据如下:

            var buffer = new byte[]
{
                    0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70,
                    105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104,
                    66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103,
                    105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115,
                    101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48,
                    34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34,
                    58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 53, 55, 44, 34,
                    114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81,
                    85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101, 67, 117,
                    114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114, 115,
                    105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101, 34,
                    58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103, 103,
                    101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44,
                    34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44,
                    34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102,
                    102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97,
                    110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113, 117, 101,
                    34, 58, 50, 56, 52, 57, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83,
                    65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108, 105, 122,
                    101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78,
                    34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225,
                    123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34,
                    58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114,
                    73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115,
                    101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34,
                    44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97,
                    103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103,
                    101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 52, 55,
                    44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78,
                    95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101,
                    67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114,
                    115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101,
                    34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103,
                    103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34,
                    44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34,
                    44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34,
                    108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113,
                    117, 101, 34, 58, 50, 56, 53, 51, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77,
                    69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108,
                    105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83,
                    79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0,
                    0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100,
                    115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107,
                    101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102,
                    102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58,
                    34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34,
                    102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86,
                    65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 52, 56, 44, 34, 114, 101, 109, 97, 114,
                    107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44,
                    34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116,
                    82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50,
                    53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101,
                    120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34,
                    48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34,
                    48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105,
                    110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49,
                    44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97,
                    113, 117, 101, 34, 58, 50, 56, 53, 48, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95,
                    77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97,
                    108, 105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74,
                    83, 79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0,
                    0, 0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108,
                    100, 115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111,
                    107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34,
                    58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34,
                    102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86,
                    65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 53, 50, 44, 34, 114, 101, 109, 97, 114,
                    107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44,
                    34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116,
                    82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50,
                    53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101,
                    120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104,
                    105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116,
                    66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34,
                    58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103,
                    101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 53, 52,
                    44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78,
                    95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101,
                    67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114,
                    115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101,
                    34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103,
                    103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34,
                    44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34,
                    44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34,
                    108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113,
                    117, 101, 34, 58, 50, 56, 53, 54, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77,
                    69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108,
                    105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83,
                    79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0,
                    0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100,
                    115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107,
                    101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102,
                    102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58,
                    34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34,
                    102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86,
                    65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 53, 49, 44, 34, 114, 101, 109, 97, 114,
                    107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44,
                    34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116,
                    82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50,
                    53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101,
                    120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104,
                    105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116,
                    66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102,
                    115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34,
                    48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103, 101,
                    34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 53, 53, 44, 34,
                    114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81,
                    85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101, 67, 117,
                    114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114, 115,
                    105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101, 34,
                    58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103, 103,
                    101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44,
                    34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44,
                    34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102,
                    102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97,
                    110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 55, 55, 51, 44,
                    34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95,
                    81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101, 67,
                    117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114,
                    115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101,
                    34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103,
                    103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34,
                    44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34,
                    44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34,
                    108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113,
                    117, 101, 34, 58, 50, 55, 55, 50, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77,
                    69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108,
                    105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114,
                    115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101,
                    34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103,
                    103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34,
                    44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34,
                    44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34,
                    108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113,
                    117, 101, 34, 58, 50, 55, 55, 48, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77,
                    69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108,
                    105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83,
                    79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0,
                    0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100,
                    115, 34, 58, 123, 34, 115, 117,
                    103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34,
                    48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34,
                    48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105,
                    110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49,
                    44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97,
                    113, 117, 101, 34, 58, 50, 55, 50, 51, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95,
                    77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97,
                    108, 105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74,
                    83, 79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0,
                    0, 0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108,
                    100, 115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111,
                    107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34,
                    58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58,
                    49, 44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112,
                    97, 113, 117, 101, 34, 58, 50, 55, 50, 50, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79,
                    95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105,
                    97, 108, 105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34,
                    74, 83, 79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0
};

            var data = new byte[65535];
            Array.Copy(buffer, data, buffer.Length);
            var packet = new Packet(data, 0, 234);

集群所有地址连接失败!

是我的端口没打开吗?
可是我已经打开了啊
-A INPUT -p tcp -m tcp --dport 10911 -j ACCEPT
-A INPUT -p tcp -m tcp --dport 9876 -j ACCEPT
-A FORWARD -j DOCKER-USER
-A FORWARD -j DOCKER-ISOLATION-STAGE-1
-A FORWARD -o docker0 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A FORWARD -o docker0 -j DOCKER
-A FORWARD -i docker0 ! -o docker0 -j ACCEPT
-A FORWARD -i docker0 -o docker0 -j ACCEPT
-A DOCKER-ISOLATION-STAGE-1 -i docker0 ! -o docker0 -j DOCKER-ISOLATION-STAGE-2
-A DOCKER-ISOLATION-STAGE-1 -j RETURN
-A DOCKER-ISOLATION-STAGE-2 -o docker0 -j DROP
-A DOCKER-ISOLATION-STAGE-2 -j RETURN

10911 9876 这两个端口我都打开了

rocketmq origin msgid is diffent with sdk parse

原始消息属性,其中UNIQ_KEY为:AC17043B001E1E9804B955C1DE160B6C
`

{"REAL_TOPIC":"zyq_mq_contract_tby_topic","sw8-x":"0-1684309168662","sw8":"1-NzgwOWIwNmE1ZDBmNDQ3ZTlhZDFmNGFmNTA5YWIzYWEuNDAwLjE2ODQzMDkxNjg2NjIwMjU1-NzgwOWIwNmE1ZDBmNDQ3ZTlhZDFmNGFmNTA5YWIzYWEuNDAwLjE2ODQzMDkxNjg2NjIwMjU0-0-cWEtZS1zaWduYXR1cmUtc2VydmVy-MTcyLjIzLjQuNTlAOTE5NTQ1NGQwNjFiNGE3Zjk2NDRjMGMxZDc3ZTVlZDg=-Um9ja2V0TVEvenlxX21xX2NvbnRyYWN0X3RieV90b3BpYy9Qcm9kdWNlcg==-cm9ja2V0bXExLXFhLnl6dy5jbjo5ODc2O3JvY2tldG1xMi1xYS55encuY246OTg3Njtyb2NrZXRtcTMtcWEueXp3LmNuOjk4NzY=","id":"5cdfe77e-e743-3156-ded2-010abbb7a31e","UNIQ_KEY":"AC17043B001E1E9804B955C1DE160B6C","WAIT":"false","contentType":"application/json","DELAY":"1","TAGS":"contract_info_tag","timestamp":"1684309168662","REAL_QID":"1"}

`

版本为【2.2.2023.401】的SDK解析解析出来之后,msgid:AC1001740000BCFD000000573DA3275F
能找时间看看是因为协议问题导致解析逻辑变了吗

消费者始终不触发OnConsume回调,不知道原因

  • 环境
  1. rocket部署在win10-wsl2的docker中,用的foxiswho/rocketmq,见 https://github.com/foxiswho/docker-rocketmq
  2. 并修改了其中要求的foxiswho/rocketmq版本为4.8.0, broker.conf文件中的brokerIP1=172.26.64.99(ip为wsl2的ifconfig)
  3. 用docker-compose成功启动,能打开console控制台
  • 新建2个工程test1,test2 分别为生产者和消费者
var mq111 = new Producer()
{
    NameServerAddress = "127.0.0.1:9876",

    Log = XTrace.Log,
};
mq111.Start();
mq111.CreateTopic("nx_test33", 2);
mq111.Stop();
mq111.Dispose();

var mq = new Producer
{
    Topic = "nx_test33",
    NameServerAddress = "127.0.0.1:9876",

    Log = XTrace.Log,
};

mq.Start();

for (var i = 0; i < 11; i++)
{
    var str = "学无先后达者为师" + i;
    //var str = Rand.NextString(1337);

    var sr = mq.Publish(str, "TagA");
}


Console.WriteLine("正在处理消息s");
Console.ReadLine();
var consumer = new Consumer
{
    Topic = "nx_test33",
    Group = "testg2",
    NameServerAddress = "127.0.0.1:9876",

    FromLastOffset = true,
    SkipOverStoredMsgCount = 0,
    BatchSize = 20,

    Log = XTrace.Log,
};

consumer.OnConsume = (q, ms) =>
{
    XTrace.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);

    foreach (var item in ms.ToList())
    {
        XTrace.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}");
    }

    return true;
};

consumer.Start();


Console.WriteLine("任意键可以按,然后退出");
Console.ReadLine();
  • 执行上面2个后,发现消息的状态是PULL, 但是消费者的OnConsume回调始终不触发
    image
    image

请问有实现 OneWay 发送的计划吗?

目前发送方确实不关心消息是否在 Broker 落地,测试发现没有 OneWay 的情况下每次 publish 需要花费 30ms 以上,很多情况下需要 200ms 才能收到 ACK,导致发送方消息堆积。这边没法改变 broker,希望能有 OneWay 或其它曲线救国的实现方式减少请求的耗时。感谢!

rmq版本4.7.1,运行一段时间后会提示: FATAL | The service threw an unhandled exceptio

2023-02-09 04:14:52,033 | FATAL | The service threw an unhandled exception | 44
System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Net.Sockets.SocketAsyncEventArgs'.
at System.Net.Sockets.SocketAsyncEventArgs.ThrowForNonFreeStatus(Int32 status)
at System.Net.Sockets.SocketAsyncEventArgs.SetBuffer(Byte[] buffer, Int32 offset, Int32 count)
at NewLife.Net.SessionBase.ReleaseRecv(SocketAsyncEventArgs se, String reason)
at NewLife.Net.SessionBase.ProcessEvent(SocketAsyncEventArgs se, Int32 bytes, Boolean ioThread)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location where exception was thrown ---
at System.Threading.ThreadPoolWorkQueue.Dispatch()

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.