Code Monkey home page Code Monkey logo

qunarcorp / qmq Goto Github PK

View Code? Open in Web Editor NEW
2.8K 152.0 693.0 7.09 MB

QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。

License: Apache License 2.0

Makefile 0.08% Java 82.89% Shell 0.37% Batchfile 0.04% C# 11.39% HTML 0.06% Erlang 2.20% C++ 2.98%
mq message event event-driven kafka message-driven message-queue message-bus event-bus xa

qmq's Introduction

QMQ

Maven Central License

QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。目前在公司内部日常消息qps在60W左右,生产上承载将近4W+消息topic,消息的端到端延迟可以控制在10ms以内。

主要提供以下特性:

  • 异步实时消息
  • 延迟/定时消息(支持任意秒级)
  • 广播消息(每个Consumer都收到相同消息,比如本地cache更新)
  • 基于Tag的服务端过滤
  • Consumer端幂等处理支持
  • Consumer端filter
  • 消费端支持按条ack消息
  • 死信消息
  • 结合Spring annotation使用的简单API
  • 提供丰富的监控指标
  • 接入OpenTracing
  • 事务消息
  • Consumer的处理能力也可以方便扩容缩容
  • Server可以随心所欲扩容缩容
  • Java Client, .NET Client
  • 读写分离
  • 消息投递轨迹
  • 历史消息的自动备份
  • 有序消息(即将开源)

JDK最低版本要求

  • Client: 1.7及其以上版本
  • Server: 1.8及其以上版本

Maven

qmq的客户端已经发布到maven**仓库,可以通过下面的方式获取

<dependency>
    <groupId>com.qunar.qmq</groupId>
    <artifactId>qmq</artifactId>
    <version>{see maven}</version>
</dependency>

快速开始

你可以通过设计背景了解设计QMQ的初衷和它与其他消息队列的不同。 阅读架构概览了解QMQ的存储模型

文档

技术支持

欢迎关注QMQ官方公众号

公众号

QQ群

QMQ技术交流群(2) QMQ技术交流群(1)
群号:1018190609 群号:915826408(已满)
QQ群2 QQ群1(已满)

开源协议

Apache 2 license

用户(已经在生产使用)

欢迎在这里,以方便我们提供更好的技术支持

去哪儿 携程 IYMedia 便利蜂 金汇金融 必贝证券 易宝支付 三节课 红松学堂 跨越速运

Stars History

Stargazers over time

qmq's People

Contributors

cngddflzw avatar decylus avatar dennis8274 avatar erichetti avatar fantasywxx avatar gibbon2000pro avatar haola334 avatar keliwang avatar paascloud avatar weizhifancheng avatar withlin avatar yuyijq avatar z-star avatar zhipeng-cai avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

qmq's Issues

能再详细介绍下如何通过dispatch log避免重复投递吗?

dispatch log 延时/定时消息投递成功后写入,主要用于在应用重启后能够确定哪些消息已经投递,dispatch log里写入的是消息的offset,不包含消息内容。当延时server中途重启时,我们需要判断出当前这个刻度(比如一个小时)里的消息有哪些已经投递了则不重复投递。

这段不是太明白。比如下一小时的msg已经load到内存了,此时宕机,怎么知道哪些消息已经投递了?

ubuntu环境,启动脚本不兼容

问题描述
ubuntu环境下,

sh +x metaserver.sh start

报错:

root@ubuntu:/data/qmq_dist/qmq-dist-1.1.3.3-bin/bin# sh +x metaserver.sh start
metaserver.sh: 2: set: Illegal option -o pipefail

环境配置
ubuntu

复现步骤
1.
2.
3.

实际输出结果

期望输出结果

原因
ubuntu的 shell 默认安装的是 dash,而不是 bash。

解决方案

$sudo dpkg-reconfigure dash

然后选择 no 或者 否 ,并确认。
这样做将重新配置 dash,并使其不作为默认的 shell 工具。

参考:https://blog.csdn.net/sahusoft/article/details/9115367

重复的maven artifact name

问题描述
maven列表中出现了2个"qmq client",是因为qmq-deploy的pom.xml将name写为"qmq client"导致,应该为"qmq deploy"。
另外有些artifact name带"-",有些不带,建议统一。

有一个小问题? 关于mmap内存和message log

mmap 映射文件什么时候释放,是一直保存到文件删除吗?还是咋样的 ?
message log 按照主题区分,还是一个大文件 ?
如果是像rocketmq 一样是一个大文件,不用主题之间是否会互相影响

消息重发可能会导致消息内容不一致

问题描述
如果业务方在listener中修改了message的属性,然后抛了异常,那么在ack时是发了一条新的消息,并且以此修改过的message对象为消息内容,此时,第二次收到此消息时,消息内容会不一致

消息不用直接添加到 时间轮

问题描述
现在延迟消息到达 delya-server 的时候, 除了添加到 指定目标时间的schedule log, 还会直接投递到 多层时间轮的, 但是 时间轮实现 基于 内存的, 无限制添加到时间轮 会导致 内存吃紧

消息添加到时间轮之前, 应该判断下延迟消息的延迟时间, 如果太大(可配置), 比如大于 1天, 就不直接添加到 时间轮里面

client_log_switch.properties 文件缺失

java.io.FileNotFoundException: qmq-dist\conf\client_log_switch.properties (系统找不到指定的文件。)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at java.io.FileReader.(FileReader.java:72)
at qunar.tc.qmq.configuration.local.LocalDynamicConfig.loadConfig(LocalDynamicConfig.java:98)
at qunar.tc.qmq.configuration.local.LocalDynamicConfig.onConfigModified(LocalDynamicConfig.java:90)
at qunar.tc.qmq.configuration.local.LocalDynamicConfigFactory.doCreate(LocalDynamicConfigFactory.java:47)
at qunar.tc.qmq.configuration.local.LocalDynamicConfigFactory.create(LocalDynamicConfigFactory.java:39)
at qunar.tc.qmq.configuration.DynamicConfigLoader.load(DynamicConfigLoader.java:48)
at qunar.tc.qmq.meta.utils.ClientLogUtils.(ClientLogUtils.java:41)
at qunar.tc.qmq.meta.processor.ClientRegisterWorker.handleClientRegister(ClientRegisterWorker.java:94)
at qunar.tc.qmq.meta.processor.ClientRegisterWorker.process(ClientRegisterWorker.java:75)
at qunar.tc.qmq.meta.processor.ClientRegisterWorker.process(ClientRegisterWorker.java:50)
at qunar.tc.qmq.concurrent.ActorSystem$Actor.processMessages(ActorSystem.java:173)
at qunar.tc.qmq.concurrent.ActorSystem$Actor.run(ActorSystem.java:155)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at qunar.tc.qmq.concurrent.NamedThreadFactory$1.run(NamedThreadFactory.java:52)
at java.lang.Thread.run(Thread.java:748)

期望提供benchmark和test代码

  • test代码可以给使用者引入生产环境提供极大的信心
  • benchmark可以方便使用者测试系统的性能.
    期待提供.

分库分表的事务消息怎么处理?

假设我的业务表是分库分表的.使用了qmq发送事务消息.qmq消息怎么知道保存在哪一个数据源的qmq_msg_queue表呢?这里的分库并不是在同一个db实例中.

有跟 RabbbitMQ 的详细对比吗?

看设计背景里跟 kafka 的对比比较多,RabbbitMQ 也多介绍下吧。最近正在做选型考虑,希望能够有更多的素材。
感谢你们的工作!

单机(本机)启动 metaserver,server,delay三个服务,然后会看到 delay 和server 注册有问题

问题描述
如题。 本机启动三个服务,然后测试发送消息以及消费消息,在启动server之后 再去启动delay,发现注册组会有问题,查看是区分时 按照hostname 和端口 确定为唯一的,这种有完整的单机(或idea)启动全部服务的文档么。

有没有微信交流群什么的呢? QQ交流群 加群都没加上。

环境配置

复现步骤
1.
2.
3.

实际输出结果

期望输出结果
希望 能给到完整的本地(单机)部署文档

谢谢。

想请教下PullLog和ActionLog的存储模型的理解

在存储模型中,文档中说明qmq有三种log,messageLog、ConsumeLog、PullLog。在我的理解,messageLog类似RocketMQ的commitLog,ConsumeLog类似于partition的概念,或者说是rocketmq中的逻辑队列的索引文件,那ActionLog是什么概念呢?是存储消费进度的log吗?PullLog怎么理解呢?可以理解为是动态队列吗?那这种情况下的消息分配是类似于“推模式”吗?我比较好奇可伸缩partition的这种情况下顺序消费如何实现呢?谢谢!

父 pom.xml 重复的依赖配置

问题描述

父 pom.xml 重复的依赖配置:

        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-server</artifactId>
            <version>9.4.14.v20181114</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-servlet</artifactId>
            <version>9.4.14.v20181114</version>
        </dependency>

能再详细介绍下如何通过dispatch log避免重复投递吗?

dispatch log 延时/定时消息投递成功后写入,主要用于在应用重启后能够确定哪些消息已经投递,dispatch log里写入的是消息的offset,不包含消息内容。当延时server中途重启时,我们需要判断出当前这个刻度(比如一个小时)里的消息有哪些已经投递了则不重复投递。

支持延迟取消

问题描述

环境配置

复现步骤
1.
2.
3.

实际输出结果

期望输出结果

RetryTask好像逻辑有点问题,当前commit 7b7b456

问题描述

当一个consumer进入离线(offline)状态之后,会触发一个RetryTask, 其逻辑应当是:

  1. 比较该consumerId的firstNotAckedSequence和lastPulledSequence,如果lastPulledSequence < firstNotAckedSequence, 说明该consumer所有pull的消息都已经处理完毕,无须retry
  2. 否则,需要把所有未处理完成的消息放入retry queue中,并手动ack原来的消息

这部分逻辑是在RetryTask.run()函数中执行,其中第一步应该是在processSkipRetry()中执行。
if (processSkipRetry(consumerSequence)) return;

但是看代码,发现两个问题

  1. 在processSkipRetry()这个函数中所有的路径都会返回true,导致第2步里的逻辑不会得到执行。
    private boolean processSkipRetry(ConsumerSequence consumerSequence) {
        if (!consumerSequence.tryLock()) return true;
        try {
            final long firstNotAckedSequence = consumerSequence.getAckSequence() + 1;
            final long lastPulledSequence = consumerSequence.getPullSequence();
            if (lastPulledSequence < firstNotAckedSequence) return true;

            // put ack action
           ...
        } finally {
            consumerSequence.unlock();
        }
        return true;
    }
  1. 在这个函数内外都手动执行了对于未处理消息的ack,感觉是重复了

环境配置

复现步骤

  1. 只需要一个consumer消费的比较慢,(比如在这个commit里我改了一下qmq-demo),手动kill consumer就可以容易的复现

实际输出结果
consumer崩溃之后,当前未ack的消息不会被处理

期望输出结果
consumer崩溃之后,当前未ack的消息会被加入retry queue并重新处理

consumer group是如何分配由哪一台机器拉取消息

看到文档说,消费端都是一个拉取循环,服务端维护一个request队列,哪是如何决定group里面只有一台机器消费了消息呢。是消费端的一个group的所有机器都会请求服务端,由服务端通过负载均衡决定向哪个消费机器响应吗

init.sql是不是有问题?

CREATE TABLE datasource_config (
id int(10) unsigned not null auto_increment comment '主键id',
url varchar(128) not null default '' comment 'jdbc url',
user_name varchar(100) NOT NULL default '' comment 'db username',
password varchar(100) NOT NULL DEFAULT '' comment 'db password',
status TINYINT NOT NULL DEFAULT 0 COMMENT 'db状态',
room VARCHAR(20) NOT NULL DEFAULT '' COMMENT '机房',
create_time TIMESTAMP NOT NULL DEFAULT '2018-01-01 01:01:01' COMMENT '创建时间',
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (id),
unique key uniq_idx_name(name)
) ENGINE=InnoDB default charset=utf8mb4 comment '客户端db配置表'
MySQL 返回: 文档

#1072 - Key column 'name' doesn't exist in table

delay-server单测依赖缺失

问题描述
单元测试依赖缺失

环境配置
win10 jdk1.8 master分支

复现步骤
1.git clone 代码库
2.查看该模块单元测试代码

实际输出结果
依赖缺失
期望输出结果
无依赖缺失

QMQ 适合于像金融类的实时行情推送吗?

问题描述
看到用户里有几个金融证券类的企业,想请问下QMQ 适合于像金融类的实时行情推送的场景吗?

环境配置

复现步骤
1.
2.
3.

实际输出结果

期望输出结果

发送消息时,不发送对象的设计和折衷是什么?

我想传递订单信息,需要一个一个写order的属性,如果传递整个对象,需要通过序列化来变通,请问为什么不提供直接发送对象消息的设计呢?
如下所示.

    public void placeOrder(Order order){
        //bussiness work
        log.info("order:{}",order.toString());
        Message message = producer.generateMessage("order.changed");
        message.setProperty("orderNo", order.getOrderNo());
        message.setProperty("orderTime", order.getOrderTime());
        message.setProperty("uid", order.getUid());
  // 发送对象,需要自已序列化变通一下
        message.setProperty("order",new Gson().toJson(order));
        producer.sendMessage(message);
    }

一个关于延时消息的load小问题,以及可能的多线程问题

public boolean canAdd(long scheduleTime, long offset) {
WheelLoadCursor.Cursor currentCursor = loadingCursor.cursor();
long currentBaseOffset = currentCursor.getBaseOffset();
long currentOffset = currentCursor.getOffset();
long baseOffset = resolveSegment(scheduleTime, segmentScale);
if (baseOffset < currentBaseOffset) return true;
if (baseOffset == currentBaseOffset) {
return currentOffset <= offset;
}
return false;

我的理解是:当前的时间段的消息会实时加入时间轮,load任务offset 之后的下一个时间片的消息,会实时加入时间轮
延时消息当前时间片段要到期的时候,是怎么load下一个时间段的。如果是定时load,比如每分钟跑一次,加入17:59:00 这个load定时任务运行过了,在17:59:00之后,写入了18:00-19:00的消息,这些消息怎么load上来。
load任务怎么保证消息索引不漏和不重复地加入时间轮

部分代码日志打印格式化问题

问题描述
部分代码日志打印问题

复现步骤
LOGGER.debug("NettyConnectManageHandler", "NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
LOGGER.debug("NettyConnectManageHandler", "NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);

实际输出结果
NettyConnectManageHandler
NettyConnectManageHandler

期望输出结果
NETTY CLIENT PIPELINE: DISCONNECT 192.168.11.29
NETTY CLIENT PIPELINE: CLOSE 192.168.11.29

实时broker启动报错,172.17.0.1:20880

WARN qunar.tc.qmq.netty.client.NettyConnectManageHandler] connect remote host fail: 172.17.0.1:20880. DefaultChannelPromise@5bfbf16f(failure: java.net.ConnectException: Connection refused: /172.17.0.1:20880)
172.17.0.1是我本机docker0的地址

NeedRetryException异常本地重试处理?

根据HandleTask的执行逻辑,应该是根据相应的异常分支进行处理的。但是,GeneratedListener的onMessage方法的try catch把异常吐了,此处的业务方法抛出的异常均未处理而转为InvocationTargetException,最后在HandleTask走到了catch (Throwable e) 分支

复现步骤

  1. 开启qmq-client
  2. 配置一个@QmqConsumer消息处理方法
  3. 在消息处理方法内部throw new NeedRetryException("重试一下")进行重试;
    HandleTask部分代码如下:
public void run() {
    message.setProcessThread(Thread.currentThread());
    final long start = System.currentTimeMillis();
    final Map<String, Object> filterContext = new HashMap<>();
    message.localRetries(localRetries);
    message.filterContext(filterContext);
    Throwable exception = null;
    boolean reQueued = false;
    try {
        handler.qtraceFilter.preOnMessage(message, filterContext);
        if (!handler.triggerBeforeOnMessage(message, filterContext)) return;
        handler.listener.onMessage(message);
    } catch (NeedRetryException e) {
       // FIXME: 目前使用GeneratedListener的消息处理器不会来到这里,根据方法名称,此处应是处理部分业务能立即本地重试的代码
        exception = e;
        try {
            reQueued = localRetry(e);
        } catch (Throwable ex) {
            exception = ex;
        }
    } catch (Throwable e) {
        exception = e;
    } finally {
        triggerAfterCompletion(reQueued, start, exception, filterContext);
        handler.qtraceFilter.postOnMessage(message, exception, filterContext);
    }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.