Code Monkey home page Code Monkey logo

rabbitmq-mqtt's Introduction

RabbitMQ MQTT Plugin

This repository has been moved to the main unified RabbitMQ "monorepo", including all open issues. You can find the source under /deps/rabbitmq_mqtt. All issues have been transferred.

Getting Started

This is an MQTT plugin for RabbitMQ.

The plugin is included in the RabbitMQ distribution. To enable it, use rabbitmq-plugins:

rabbitmq-plugins enable rabbitmq_mqtt

Default port used by the plugin is 1883.

Documentation

MQTT plugin documentation is available from rabbitmq.com.

Contributing

See CONTRIBUTING.md.

Running Tests

After cloning RabbitMQ umbrella repository, change into the rabbitmq-mqtt directory and run

make tests

This will bring up a RabbitMQ node with the plugin enabled and run integration tests against it. Note that there must be no other MQTT server running on ports 1883 and 8883.

Copyright and License

(c) 2007-2020 VMware, Inc. or its affiliates.

Released under the Mozilla Public License, the same as RabbitMQ.

rabbitmq-mqtt's People

Contributors

acogoluegnes avatar ash-owl avatar binarin avatar dcorbacho avatar dumbbell avatar essen avatar fenollp avatar gerhard avatar hairyhum avatar kjnilsson avatar lukebakken avatar michaelklishin avatar priviterag avatar rade avatar rsandbach avatar schwarz avatar spring-operator avatar velimir avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitmq-mqtt's Issues

MQTT TLS test fails on OTP 18.3

MQTT ssl test failing with error

Testsuite: com.rabbitmq.mqtt.test.tls.MqttSSLTest
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.342 sec

Testcase: testInvalidUser took 0.308 sec
    FAILED
expected:<4> but was:<32109>
junit.framework.AssertionFailedError: expected:<4> but was:<32109>
    at com.rabbitmq.mqtt.test.tls.MqttSSLTest.testInvalidUser(MqttSSLTest.java:132)

Log showing following error:

19:06:20.795 [error] CRASH REPORT Process <0.2404.0> with 0 neighbours crashed with reason: no match of right hand value {cipher_suite,{rsa,aes_128_gcm,null,sha256}} in amqp_direct_connection:ssl_info/1 line 197

All tests are passing using OTP 17.0
It looks like being related to new ciphersuite format in OTP 18.3

mosquitto bridge fails to connect to RabbitMQ-MQTT

When mosquitto (1.3.4_1) is configured as bridge to a rabbitmq server (3.3.5) connection fails with
the following rabbit log file entry:
=ERROR REPORT==== 13-Sep-2014::11:44:09 ===
MQTT detected framing error '"127.0.0.1:55255 -> 127.0.0.1:1883"' for connection protocol_header_corrupt

Wireshark shows in the CONNECT message a MQTT protocol name "MQIsdp" and version "131".

regards
Thomas

Question re: TLS Session Resumption supported?

Hi,

How, if it all, does RabbitMQ (and/or RabbitMQ MQTT plugin) support TLS Session Resumption?

One of the challenges of keeping bandwidth and battery overhead low on IoT-scale devices (the whole point of MQTT) is that TLS handshakes are high overhead - and thus TLS Session Resumption is the resolution to avoid repeated TLS handshakes in an MQTT session.

HiveMQ claims they offer TLS Session Resumption to avoid repeat-handshake overhead to keep MQTT long-term session overhead low, but others will claim that TLS Session Resumption also opens your client up for attacks unless you're doing correct Perfect Forward Secrecy.

I couldn't find anything on Google about Rabbit's TLS implementation (or rabbitmq-mqtt) with regards to the MQTT 3.1.1 OASIS spec, which does reference TLS Session Resumption:

[RFC5077]
Salowey, J., Zhou, H., Eronen, P., and H. Tschofenig, "Transport Layer Security (TLS) Session Resumption without Server-Side State", RFC 5077, January 2008.
http://www.ietf.org/rfc/rfc5077.txt

Lastly, someone challenged some MQTT-over-TLS benchmarked versus HTTPS (see comment 1173 by Hannes Tschofenig) as invalid without describing what means and methods of TLS were used for the tests. I tend to agree and would like to know more, if possible.

Thanks!
Mike

retain flag

Any ideas when retain flag will be implemented?

Change default queue TTL to 1 day

It is currently too short.

The reason why we have it at all is because without Clean Session set on a connection, its client can go away and never come back at any moment, and we'd have durable queues with persistent messages sitting around forever.

This can be confusing and now is a good time to revisit the default.

Purging a queue while sending messages results in unacknowleged messages (Rabbit v3.6.2).

I've tested the following scenario - I just send a lot of messages (QoS 1) from multiple producers to a persistent/mirrored queue (without any consumers). If I now purge the contents of the queue via management UI some of the producers began to hang. If I record the network traffic via WireShark I can see that PUBACK's are missing - e.g. I send messages with identifiers 1 to 1000, but I receive only acknowledgements for 1 to 920 - so the client library is waiting for further acknowledgements, but rabbitmq won't send them - just leaving the tcp connection open. I think this violates the MQTT specification - http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718041. I know that I didn't give you much information, but maybe you already have a clue where to look at from my rough description of the problem. Also please don't hesitate to contact me if you need additional information or if I could otherwise help you to track down the problem. Thanks in advance for any hint!

some error

I find some error in rabbitmq log.
my environment:

 
    ubuntu 10.04 
    rabbitmq-server 3.1.5-1


=INFO REPORT==== 29-Aug-2013::16:17:32 ===
MQTT detected network error for "117.X.X.X:27471 -> 183.X.X.X:1883": closed

=ERROR REPORT==== 29-Aug-2013::16:17:32 ===
** Generic server <0.8464.0> terminating
** Last message in was {'EXIT',<0.8446.0>,
                           {{badmatch,not_registered},
                            [{rabbit_mqtt_reader,stop,2,[]},
                             {gen_server2,handle_msg,2,[]},
                             {proc_lib,wake_up,3,
                                 [{file,"proc_lib.erl"},{line,249}]}]}}
** When Server state == {state,amqp_direct_connection,
                         {state,'rabbit@ubuntu-002',
                          {user,<<"guest">>,
                           [administrator],
                           rabbit_auth_backend_internal,
                           {internal_user,<<"guest">>,
                            <<142,75,13,22,6,167,52,4,57,111,48,140,61,105,247,
                              169,249,185,164,125>>,
                            [administrator]}},
                          <<"/">>,
                          {amqp_params_direct,<<"guest">>,<<"guest">>,
                           <<"/">>,'rabbit@ubuntu-002',
                           {amqp_adapter_info,
                            {0,0,0,0,0,65535,46977,49667},
                            1883,
                            {0,0,0,0,0,65535,30088,11},
                            27471,
                            <<"117.X.X.11:27471 -> 183.X.X.X:1883">>,
                            {'MQTT',{3,1}},
                            [{ssl,false}]},
                           []},
                          {amqp_adapter_info,
                           {0,0,0,0,0,65535,46977,49667},
                           1883,
                           {0,0,0,0,0,65535,30088,11},
                           27471,
                           <<"117.X.X.11:27471 -> 183.X.X.X:1883">>,
                           {'MQTT',{3,1}},
                           [{ssl,false}]},
                          <0.8484.0>,undefined},
                         <0.8460.0>,<0.8472.0>,
                         {amqp_params_direct,<<"guest">>,<<"guest">>,<<"/">>,
                          'rabbit@ubuntu-002',
                          {amqp_adapter_info,
                           {0,0,0,0,0,65535,46977,49667},
                           1883,
                           {0,0,0,0,0,65535,30088,11},
                           27471,
                           <<"117.X.X.X:27471 -> 183.X.X.X:1883">>,
                           {'MQTT',{3,1}},
                           [{ssl,false}]},
                          []},
                         0,
                         [{<<"capabilities">>,table,
                           [{<<"publisher_confirms">>,bool,true},
                            {<<"exchange_exchange_bindings">>,bool,true},
                            {<<"basic.nack">>,bool,true},
                            {<<"consumer_cancel_notify">>,bool,true}]},
                          {<<"copyright">>,longstr,
                           <<"Copyright (C) 2007-2013 GoPivotal, Inc.">>},
                          {<<"information">>,longstr,
                           <<"Licensed under the MPL.  See http://www.rabbitmq.com/">>},
                          {<<"platform">>,longstr,<<"Erlang/OTP">>},
                          {<<"product">>,longstr,<<"RabbitMQ">>},
                          {<<"version">>,longstr,<<"3.1.5">>}],
                         #Fun,
                         #Fun,false}
** Reason for termination ==
** {unexpected_msg,
       {'EXIT',<0.8446.0>,
           {{badmatch,not_registered},
            [{rabbit_mqtt_reader,stop,2,[]},
             {gen_server2,handle_msg,2,[]},
             {proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,249}]}]}}}

=INFO REPORT==== 29-Aug-2013::16:17:32 ===
accepting MQTT connection (117.X.XX:43208 -> 183.X.X.X:1883)

=ERROR REPORT==== 29-Aug-2013::16:17:32 ===
** Generic server <0.17986.0> terminating
** Last message in was {inet_async,#Port<0.8643>,3635,
                           {ok,<<16,37,0,6,77,81,73,115,100,112,3,2,1,44,0,
                                 23,115,101,101,100,105,116,47,98,56,99,97,
                                 49,102,97,56,97,55,102,48,57,53,101,102>>}}
** When Server state == {state,#Port<0.8643>,
                            "117.X.X.X:43208 -> 183.X.X.3:1883",true,
                            running,false,none,
                            {proc_state,#Port<0.8643>,
                                {dict,0,16,16,8,80,48,
                                    {[],[],[],[],[],[],[],[],[],[],[],[],[],
                                     [],[],[]},
                                    {{[],[],[],[],[],[],[],[],[],[],[],[],[],
                                      [],[],[]}}},
                                {undefined,undefined},
                                {0,nil},
                                {0,nil},
                                undefined,1,undefined,undefined,undefined,
                                {undefined,undefined},
                                undefined,<<"amq.topic">>}}
** Reason for termination ==
** {badarg,[{erlang,list_to_binary,[unknown],[]},
            {amqp_direct_connection,socket_adapter_info,2,[]},
            {rabbit_mqtt_processor,process_login,3,[]},
            {rabbit_mqtt_processor,process_request,3,[]},
            {rabbit_mqtt_reader,process_received_bytes,2,[]},
            {gen_server2,handle_msg,2,[]},
            {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]}

=INFO REPORT==== 29-Aug-2013::16:17:32 ===
accepting MQTT connection (118.X.X.X:54619 -> 121.X.X.X:1883)

=ERROR REPORT==== 29-Aug-2013::16:17:32 ===
connection <0.17989.0>, channel 1 - soft error:
{amqp_error,not_found,
            "no queue 'mqtt-subscription-seedit/4cec3667f1acc5b1qos1' in vhost '/'",
            'queue.delete'}

=INFO REPORT==== 29-Aug-2013::16:17:34 ===
MQTT detected network error for "183.X.X.6:46153 -> 121.X.X.X:1883": closed

=INFO REPORT==== 29-Aug-2013::16:17:34 ===
accepting MQTT connection (117.X.X.X:33572 -> 183.X.X.X:1883)

=ERROR REPORT==== 29-Aug-2013::16:17:34 ===
** Generic server <0.18008.0> terminating
** Last message in was {inet_async,#Port<0.8646>,3642,
                           {ok,<<16,37,0,6,77,81,73,115,100,112,3,2,1,44,0,
                                 23,115,101,101,100,105,116,47,51,49,100,56,
                                 49,102,97,98,102,102,51,52,99,102,50,55>>}}
** When Server state == {state,#Port<0.8646>,
                            "117.X.X.75:33572 -> 183.X.X.X:1883",true,
                            running,false,none,
                            {proc_state,#Port<0.8646>,
                                {dict,0,16,16,8,80,48,
                                    {[],[],[],[],[],[],[],[],[],[],[],[],[],
                                     [],[],[]},
                                    {{[],[],[],[],[],[],[],[],[],[],[],[],[],
                                      [],[],[]}}},
                                {undefined,undefined},
                                {0,nil},
                                {0,nil},
                                undefined,1,undefined,undefined,undefined,
                                {undefined,undefined},
                                undefined,<<"amq.topic">>}}
** Reason for termination ==
** {badarg,[{erlang,list_to_binary,[unknown],[]},
            {amqp_direct_connection,socket_adapter_info,2,[]},
            {rabbit_mqtt_processor,process_login,3,[]},
            {rabbit_mqtt_processor,process_request,3,[]},
            {rabbit_mqtt_reader,process_received_bytes,2,[]},
            {gen_server2,handle_msg,2,[]},
            {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]}

Enable MQTT communication via WebSocket

There appears to be no way to communicate with the MQTT plug-in from a JavaScript client over web socket. Unlike STOMP, where there is a Web STOMP plug-in, there is no Web MQTT plug-in, and so while there a web socket server in Rabbit (JockJS plug-in), there is no way to use it to initiate MQTT connect.

CONNACK return code 0x05

Version: RabbitMQ 3.3.0 release
use rabbitmq_mqtt plugin's default configuration.
my mqtt client CONNECT success and SUBSCRIBE success, but then return CONNACK message:

 0x05  Connection Refused:  not authorized

why?
can this version allow anonymous?

Standalone build fails because warnings are treated as errors

compile: warnings being treated as errors
src/rabbit_mqtt_processor.erl:465: this clause cannot match because a previous clause at line 464 always matches
erlang.mk:4961: recipe for target 'ebin/rabbitmq_mqtt.app' failed
make[1]: *** [ebin/rabbitmq_mqtt.app] Error 1

MQTT detected framing error 'function_clause' for connection

topic: code/6ko2kn/2/message
remaining_length:283
โ€จpayload:
โ€จโ€จ{"uuid":"A0282182-A87B-435F-A4BC-B7F7F244DD8D","to":"2","version":"1414010536392","timestamp":"1414073067087","anonymous":"0","from":"6","fromName":"test","type":"1","code":"6ko2kn","push_content":"test: /","push_enable":"1","content":"{"text":"\/"}"}โ€จโ€จ

header & payload:

  1. using my_pub : mid- 0x002a ๏ผˆMQTT detected framing error 'function_clause' for connection xxxxxx๏ผ‰โ€จโ€จbelow is the hex****โ€จโ€จ
    329b020015636f64652f366b6f326b6e2f322f6d657373616765002a7b2275756964223a2241303238323138322d413837422d343335462d413442432d423746374632343444443844222c22746f223a2232222c2276657273696f6e223a2231343134303130353336333932222c2274696d657374616d70223a2231343134303733303637303837222c22616e6f6e796d6f7573223a2230222c2266726f6d223a2236222c2266726f6d4e616d65223a2274657374222c2274797065223a2231222c22636f6465223a22366b6f326b6e222c22707573685f636f6e74656e74223a22746573743a205c2f222c22707573685f656e61626c65223a2231222c22636f6e74656e74223a227b5c22746578745c223a5c225c5c5c2f5c227d227d

โ€จ2. using mosquitto_pub: mid- 0x0001 (no error)โ€จโ€จ329b020015636f64652f366b6f326b6e2f322f6d65737361676500017b2275756964223a2241303238323138322d413837422d343335462d413442432d423746374632343444443844222c22746f223a2232222c2276657273696f6e223a2231343134303130353336333932222c2274696d657374616d70223a2231343134303733303637303837222c22616e6f6e796d6f7573223a2230222c2266726f6d223a2236222c2266726f6d4e616d65223a2274657374222c2274797065223a2231222c22636f6465223a22366b6f326b6e222c22707573685f636f6e74656e74223a22746573743a205c2f222c22707573685f656e61626c65223a2231222c22636f6e74656e74223a227b5c22746578745c223a5c225c5c5c2f5c227d227d

Thanks

When authenticating using a client cert, there is no way to specify a vhost for that client

When connecting with username/password, vhost is specified as part of username (vhost:username), but there is no username when authenticating with a client cert. Is there some way to tie a vhost to a user using client cert authentication?

Some possible solutions are:

  1. Use the x509 certificate's optional fields. This could be problematic for many users.
  2. Provide a mapping of user (via client cert) to vhost via runtime parameters.

Clean sessions last longer than connection

I just spent some time debugging an issue with the newest version of RabbitMQ (3.5.6), turns out that my queues are not automatically deleted anymore. I'm opening MQTT connections with clean session = 1 and no client id. I'm subscribing with QoS 1.

Several reconnects of my application results in a lot of (bound!) queues that are never deleted anymore. And because I'm not using a client id I can never connect to them again (at least not with MQTT).

So what I'm going to do is just use QoS 0, way better in my use case.

But what I learned (and wanted to share):

It states:

If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one. This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be reused in any subsequent Session

So I don't think that the queues should be durable in this case. In my opinion it would be a better fit to have it like it was before issue 30 was implemented.

Any other thoughts on this one? I don't see why the MQTT protocol allows the combination anyway.

Retained messages in RabbitMQ 3.5.4

Hi,

I am trying to publish retained messages using the M2MQtt C# library however it seems that RabbitMQ is not retaining them.

Has this been developed?

Kind Regards,
JP

Standalone build fails with `behaviour ranch_protocol undefined`

It seems there is a dependency to ranch that causes a standard plugin build to fail (in both rabbit-stomp and rabbit-mqtt) during build:

$ git clone https://github.com/rabbitmq/rabbitmq-mqtt.git && cd rabbitmq-mqtt && make
[...]
...error is: compile: warnings being treated as errors21:33:31
src/rabbit_mqtt_connection_sup.erl:20: behaviour ranch_protocol undefined21:33:41
erlang.mk:4971: recipe for target 'ebin/rabbitmq_mqtt.app' failed21:33:59
make[1]: *** [ebin/rabbitmq_mqtt.app] Error 1

Error on close

Hi,

Using an iOS MQTT client, been seeing a lot of these, on disconnect from RMQ, and I'm not sure what they are. Using a .deb version of RMQ 3.1.5, but does the same with the generic one. I'm not sure quite what to make of it. My next step is to compile from scratch.

RabbitMQ 3.1.5, Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:8:8] [rq:8] [async-threads:0] [kernel-poll:false]

The error:

=ERROR REPORT==== 27-Aug-2013::06:38:46 ===
** Generic server <0.1049.0> terminating
** Last message in was {'EXIT',<0.1047.0>,
                               {{badmatch,not_registered},
                                [{rabbit_mqtt_reader,stop,2},
                                 {gen_server2,handle_msg,2},
                                 {proc_lib,wake_up,3}]}}
** When Server state == {state,amqp_direct_connection,
                         {state,'rabbit@rabbitmq2-us-west-1c',
                          {user,<<"guest">>,
                           [administrator],
                           rabbit_auth_backend_internal,
                           {internal_user,<<"guest">>,
                            <<43,45,66,29,111,180,187,183,248,55,93,60,22,110,
                              88,116,65,234,47,118>>,
                            [administrator]}},
                          <<"/">>,
                          {amqp_params_direct,<<"guest">>,<<"guest">>,
                           <<"/">>,'rabbit@rabbitmq2-us-west-1c',
                           {amqp_adapter_info,
                            {0,0,0,0,0,65535,44063,3572},
                            1883,
                            {0,0,0,0,0,65535,44063,249},
                            25138,
                            <<"172.31.0.249:25138 -> 172.31.13.244:1883">>,
                            {'MQTT',{3,1}},
                            [{ssl,false}]},
                           []},
                          {amqp_adapter_info,
                           {0,0,0,0,0,65535,44063,3572},
                           1883,
                           {0,0,0,0,0,65535,44063,249},
                           25138,
                           <<"172.31.0.249:25138 -> 172.31.13.244:1883">>,
                           {'MQTT',{3,1}},
                           [{ssl,false}]},
                          <0.1053.0>,undefined},
                         <0.1048.0>,<0.1051.0>,
                         {amqp_params_direct,<<"guest">>,<<"guest">>,<<"/">>,
                          'rabbit@rabbitmq2-us-west-1c',
                          {amqp_adapter_info,
                           {0,0,0,0,0,65535,44063,3572},
                           1883,
                           {0,0,0,0,0,65535,44063,249},
                           25138,
                           <<"172.31.0.249:25138 -> 172.31.13.244:1883">>,
                           {'MQTT',{3,1}},
                           [{ssl,false}]},
                          []},
                         0,
                         [{<<"capabilities">>,table,
                           [{<<"publisher_confirms">>,bool,true},
                            {<<"exchange_exchange_bindings">>,bool,true},
                            {<<"basic.nack">>,bool,true},
                            {<<"consumer_cancel_notify">>,bool,true}]},
                          {<<"copyright">>,longstr,
                           <<"Copyright (C) 2007-2013 GoPivotal, Inc.">>},
                          {<<"information">>,longstr,
                           <<"Licensed under the MPL.  See http://www.rabbitmq.com/">>},
                          {<<"platform">>,longstr,<<"Erlang/OTP">>},
                          {<<"product">>,longstr,<<"RabbitMQ">>},
                          {<<"version">>,longstr,<<"3.1.5">>}],
                         #Fun<amqp_connection_sup.1.1716714>,
                         #Fun<amqp_connection_sup.2.54430129>,false}
** Reason for termination ==
** {unexpected_msg,{'EXIT',<0.1047.0>,
                           {{badmatch,not_registered},
                            [{rabbit_mqtt_reader,stop,2},
                             {gen_server2,handle_msg,2},
                             {proc_lib,wake_up,3}]}}}

SessionPresent flag not set when reconnected to server

Tested using RabbitMQ 3.6.0. It was recently installed and uses default settings.

When connecting to a server with cleansession=0 and a valid client-id, a persistent session should be created.

I captured the packet response using tcpdump, and found the "CONACK" response. This eliminates the client as a factor.

The packet after the CON returned: "0x20 0x02 0x00 0x00"
0x20 = CONACK type
0x02 = remaining bytes in message == 2
0x00 = Connect Acknowledge flags. Bit 0 should be set.
0x00 = Connect Return Code.

It should have the values:
"0x20 0x02 0x01 0x00"
0x20 = CONACK type
0x02 = remaining bytes in message == 2
0x00 = Connect Acknowledge flags. Bit 0 indicating session present.
0x00 = Connect Return Code.

I have verified with a test program that this works. I have also tested this same program against iot.eclipse.org and test.mosquitto.org and it works as expected.

In all other ways, the MQTT plugin is working as expected. The persistent queue is created using the client-id, and QoS1 messages delivered to an existing subscription are queuing up as shown in the management console. Also, when the client connects it does receive the queued messages. The only issue is the SessionPresent flag isn't being reported properly.

Retain

Future features
...
"Retained messages" is an MQTT feature where the broker retains flagged messages and delivers them to future subscribing clients. E.g. in a topic for sensor readings, a retained message allows a client to receive the last reading without needing to wait for the next reading. By default AMQP 0-9-1 exchanges do not retain any message state. Therefore the MQTT adapter makes no attempt to honour the "Retained" flag, which will be silently ignored.
...

I'd like to make RabbitMQ my MQTT broker so that I can take advantage of its clustering capabilities, but this is a dealbreaker feature for me, as my apps rely heavily on being able to get a dump of relevant retained data when they connect.

I get that exchanges shouldn't be used to store data. Would it be feasible for the protocol adapter to make use of a message queue for this purpose? I'm not totally familiar with the AMQP architecture yet, but it seems like this feature could be handled by AMQP's rich feature-set.

Loopback publish issue with mosquitto after bridged

I have two mosquitto brokers installed on PC1 (mosquitto v1.4.8) and PC2 (RabbitMQ v3.6.2 with MQTT Adapter).
Bridging initiated at PC1 like this: sensor/room1/ <-> office/room1/
But I noticed there is always a duplicate message being published back whenever the bridge is active, means all my application (on PC1) which subscribes to the same topic will receives the same message twice

Please have a look at the question i posted in Stackoverflow. I having no issue if bridged to another mosquitto broker with try_private true in mosquitto.conf

MQTT Last Will not retaining messages

Hello, first of all thanks for maintaining such a great tool to all the people involved in the project. I've been using it for a while and has helped a lot in my previous projects.

I believe I found an issue with the MQTT adapter's ability to retain Last Will messages. What I am trying to achieve is a "last_status" channel for IoT. Devices post an "Online" message whenever they connect including some useful info, with retained=True, and they set an "Offline" Last Will in case they disconnect, also with retained=True.

I haven't been able to make that work. I've went through RabbitMQ config, I've made queues durable, went through MQTT adapter config, and it never happened. Only the published messages with retain=True are retained, but not last wills with retain=True.

I then tried with another MQTT broker and it worked. Here are the steps to reproduce the bug. I am using Python with PAHO MQTT client.

import paho.mqtt.client as mqtt

TOPIC = 'device/status'
BROKER = 'my.own.broker.url'

client = mqtt.Client(
    client_id="TestPublisher",
    clean_session=False,
    protocol=mqtt.MQTTv311,
)
client.will_set(TOPIC, 'Offline', qos=1, retain=True)
client.connect(host=BROKER, port=8883)
client.publish(TOPIC, str("Online"), retain=True)

This works properly, and first sends the "Online" message, and afterwards the "Offline" message. In another listener I see this messages:

Received message on topic device/status: "Online"
Received message on topic device/status: "Offline"

When I restart the listener, I receive the following retained message:

Received message on topic device/status: "Online"

If I change the broker to a sample broker from Eclipse:

import paho.mqtt.client as mqtt

TOPIC = 'device/status'
BROKER = "iot.eclipse.org"

client = mqtt.Client(
    client_id="TestPublisher",
    clean_session=False,
    protocol=mqtt.MQTTv311,
)
client.will_set(TOPIC, 'Offline', qos=1, retain=True)
client.connect(host=BROKER, port=1883)
client.publish(TOPIC, str("Online"), retain=True)

The next time I restart the listener I receive:

Received message on topic device/status: "Offline"

Any ideas of what can be wrong in my setup? Or is it definitely a bug?

Thanks in advance,
Chesco.

ACLs for MQTT topics?

Interested to hear if anyone intends to add ACL support to this module?

At the moment it is built around routing keys which as i understand cannot be controlled by the existing ACLs.

Investigate if/how queue argument mismatch can be avoided

Currently our clean/existing session checking and session recovery logic can sometimes result in queue argument mismatch, discovered as part of #37 and #38, relevant for #30.

We may be able to work around this by introducing an internal API to the Erlang client and/or server to have more context than just what's available via the core protocol.

Note that this seems to not be a major issue in practice for our users, which is somewhat relieving.

Authentication with SSL client certificates fails

Struggling for some days to get rabbitmq-mqtt working with client certificate authentication i hope to find a solution by contributing a test for easy reproduction of the issue. The minimal test is to check the "Authentication with SSL client certificates" feature as described at https://www.rabbitmq.com/mqtt.html and in principle adds only an alternative server test configuration while reusing the existing MqttSSLTest test to conform the docs. If something is missing i am more than happy for your pointers...

mlasak@9c7d04a
(I will raise a pull request for this test but would be glad to change if anything does not conform contribution guidelines)

As you see the changes are kept to a minimum, just needed/documented bits that are required to get the feature running after studying through the docs.

How to run the test and reproduce the issue (assuming use of the above rabbitmq-mqtt changes):

$ cd rabbitmq-mqtt
$ unset RABBITMQ_MQTT_SSLTEST_ONLY
$ make test
# --> Tests are successful. Now switch on client certificate authentication
$ export RABBITMQ_MQTT_SSLTEST_ONLY=1
$ make test
# --> Tests fail with the following error log (Full log here: https://gist.github.com/mlasak/4a8ee598fc340f60ab23)

Error log:

=INFO REPORT==== 21-Apr-2015::12:26:43 ===
accepting MQTT connection <0.444.0> (XXX.XXX.XXX.XXX:57676 -> XXX.XXX.XXX.XXX:8883)

=ERROR REPORT==== 21-Apr-2015::12:26:43 ===
SSL: certify: ssl_connection.erl:398:Fatal error: handshake failure

=ERROR REPORT==== 21-Apr-2015::12:26:48 ===
MQTT detected TLS upgrade error on XXX.XXX.XXX.XXX:57674 -> XXX.XXX.XXX.XXX:8883: handshake failure

=ERROR REPORT==== 21-Apr-2015::12:26:48 ===
** Generic server <0.436.0> terminating
** Last message in was {'$gen_cast',
                           {go,#Port<0.6181>,
                               #Fun<rabbit_networking.3.38010331>,<0.437.0>}}
** When Server state == undefined
** Reason for termination == 
** {network_error,{ssl_upgrade_error,{tls_alert,"handshake failure"}},
                  "XXX.XXX.XXX.XXX:57674 -> XXX.XXX.XXX.XXX:8883"}

I've tested on OSX 10.10.3 with following versions:

  • rabbitmq 3.5.0 and 3.5.1, each from github using umbrella AND from the release archive
  • Erlang/OTP 17
  • OpenSSL 0.9.8zd 8 Jan 2015

Thank you for your attention and any help.

Cannot connect with cleansession=false when previous connection had cleansession=true

I had written some code to connect to the MQTT broker using the default value for cleansession, which was true using the Paho golang package. When I changed this to have cleansession=false, the client failed to connect to the broker. However, when I tried doing the same with test.mosquitto.org, things work fine. I am using RabbitMQ 3.5.6, Erlang 18.1

You can find details of the error and the code here:
http://stackoverflow.com/questions/34038284/mqtt-change-client-from-using-cleanstate-true-to-cleanstate-false

MQTT Client ID is not available in RabbitMQ

I'm no expert on MQTT or RabbitMQ, but it seems that a Client ID set in the client (I'm using the Mosquitto Python client) doesn't end up in RabbitMQ or in the client_properties from the management API. Perhaps my client isn't actually sending it correctly, but I thought I'd file this anyway in case my client is working correctly.

QoS 1 queues should not be auto-deleted

Currently setting Clean Session to true will make the queue auto-delete (with QoS1). This makes little sense as when the client disconnects, the queue is deleted. While working around this should be easy โ€” simply not use the clean session flag โ€” this is pretty poor usability.

Old subscriptions not stored on sticky session

When the mqtt client disconnects and reconnects and clean session is false then all the old subscriptions need to be restored. But here none of the old subscriptions are present. Client needs to subscribe again for the channels

MQTT Client reconnect after unsubscribe is creating queue again

Scenario Steps:

  1. MQTT client subscribe for topic "t" (server side subscription ttl defined 30 secs)
  2. Disconnect MQTT client
  3. Susbcription expire after ttl (queue and binding are getting deleted)
  4. MQTT client reconnects only (no subscription)
  5. no queue & binding should be created but queue is getting created.

I am attaching the MqttTest.java (same java file as present in rabbitmq-mqtt github, added method "testMQTTSubUnSubQueueCreationOnReconnect".
MqttTest.java.zip

Hard-coded use of "make" in the testsuite

The testsuite assumes make is GNU make which may not be the case. For instance, this prevents the usage of the testsuite on FreeBSD.

The value of $MAKE should be passed down to the test.

Bridging to/from Mosquitto.

I've just be trying to set up a mosquitto bridge to the rabbitmq mqtt plugin.
The configuration works fine to another mosquitto, but so far I'm
unable to connect to your plugin. Is this even supposed to be possible?

BTW the motivation for doing this is to work around the lack of websocket
support in the current plugin

mqtt_reader doesn't handle resource alarm messages correctly

See this thread:

  =ERROR REPORT==== 24-Feb-2016::10:41:58 ===
** Generic server <0.32468.1132> terminating

** Last message in was {conserve_resources,
                           {true,true,'rabbit@ali-hz-one-box-pillar-10'}}
** When Server state == {state,#Port<0.5258113>,
                            "36.45.47.33:3442 -> 10.165.36.0:14000",true,
                            running,
                            {none,none},
                            <0.30784.1172>,false,none,
                            {proc_state,#Port<0.5258113>,

                                {dict,0,16,16,8,80,48,
                                    {[],[],[],[],[],[],[],[],[],[],[],[],[],
                                     [],[],[]},
                                    {{[],[],[],[],[],[],[],[],[],[],[],[],[],
                                      [],[],[]}}},
                                {undefined,undefined},
                                {0,nil},
                                {0,nil},
                                undefined,1,undefined,undefined,undefined,
                                {undefined,undefined},
                                undefined,<<"amq.topic">>,none,undefined,
                                undefined}}
** Reason for termination == 
** {{badarg,{true,true,'rabbit@ali-hz-one-box-pillar-10'}},

    [{rabbit_mqtt_reader,control_throttle,1,
                         [{file,"src/rabbit_mqtt_reader.erl"},{line,294}]},
     {rabbit_mqtt_reader,handle_info,2,
                         [{file,"src/rabbit_mqtt_reader.erl"},{line,129}]},
     {gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1049}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]}

Initial support for retain flag

We've seen enough requests for RETAIN to be supported.

A high profile user N wants to store retained messages in Cassandra. It makes sense to make the initial implementation to store messages in a configurable store. Initially a node-local store would be a good enough start.

Client ID tracking is node-local

Description of Issue
It looks like RabbitMQ MQTTT brokers will properly handle multiple mqtt connections with duplicate client id's by disconnecting the older connection, establishing the newer one and logging the following message :

MQTT disconnecting duplicate client id ""

My current RabbitMQ setup involves mosquitto MQTT clients connecting to a HAProxy Loadbalancer which balances connections between the RabbitMQ-MQTT brokers on a rabbitmq cluster.

However, it does not seem to be disconnecting duplicate client ID's across a clustered RabbitMQ setup. I am currently able to connect multiple mqtt clients with the same client ID to a rabbitMQ Cluster as long as they are not connected to the same node.

This is currently a problem in my scenario where my mosquitto clients consistently experience unstable network conditions which will cause them to lose internet connection and reconnect later on with the same client ID. If this reconnection occurs within the keepalive interval and is balanced to a different rabbitmq node I will have two connections with same client ID existing until the keepalive interval expires and kills the initial connection.

Doing some research into this issue I found the following chat from 2014/15 which describes someone else experiencing this exact scenario :
https://groups.google.com/forum/#!topic/rabbitmq-users/ecLWQhHBg1g

However following up on Michael Klishins final response I looked through the changelog and the rabbitmq issue history and was not able to find any documentation referencing this issue. I then recreated the scenario myself with a test environment using the versions listed below.

Was the issue fixed somewhere that I somehow overlooked in the newer versions of rabbitmq?

Versions
HA-Proxy version 1.5.14 2015/07/02
Mosquitto 1.4.9
RabbitMQ 3.5.6
Erlang 17.5

Downgrade QoS 2 subscribers to QoS 1

Currently we fail with an error. MQTT 3.1 allows for downgrades and has a non-existent error notifications story, so a much better idea would be to downgrade QoS 2 requests to QoS 1.

Not able to add Rabbitmq plugin

I was following this http://www.rabbitmq.com/plugin-development.html link to build rabbitmq plugin https://github.com/rabbitmq/rabbitmq-web-mqtt

after building i ONLY copied rabbitmq_mqtt.ez to /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.2/plugins

running sudo rabbitmq-plugins enable rabbitmq_web_mqtt
shows :

The following plugins have been enabled:
  cowlib
  cowboy
  rabbitmq_web_mqtt

Applying plugin configuration to rabbit@manish-Lenovo-G510... failed.
Error: {could_not_start,rabbitmq_web_mqtt,
           {undef,
               [{lager_config,get,
                    [{rabbit_log_lager_event,loglevel},{0,[]}],
                    []},
                {rabbit_web_mqtt_app,mqtt_init,0,
                    [{file,"src/rabbit_web_mqtt_app.erl"},{line,69}]},
                {rabbit_web_mqtt_app,start,2,
                    [{file,"src/rabbit_web_mqtt_app.erl"},{line,31}]},
                {application_master,start_it_old,4,
                    [{file,"application_master.erl"},{line,269}]}]}}

Running sudo rabbitmq-plugins list

 Configured: E = explicitly enabled; e = implicitly enabled
 | Status:   * = running on rabbit@manish-Lenovo-G510
 |/
[e*] amqp_client                       3.6.2
[e ] cowboy                            1.0.3
[e ] cowlib                            1.0.1
[e*] mochiweb                          2.13.1
[  ] rabbitmq_amqp1_0                  3.6.2
[  ] rabbitmq_auth_backend_ldap        3.6.2
[  ] rabbitmq_auth_mechanism_ssl       3.6.2
[  ] rabbitmq_consistent_hash_exchange 3.6.2
[  ] rabbitmq_event_exchange           3.6.2
[  ] rabbitmq_federation               3.6.2
[  ] rabbitmq_federation_management    3.6.2
[E*] rabbitmq_management               3.6.2
[e*] rabbitmq_management_agent         3.6.2
[  ] rabbitmq_management_visualiser    3.6.2
[E*] rabbitmq_mqtt                     3.6.2
[  ] rabbitmq_recent_history_exchange  1.2.1
[  ] rabbitmq_sharding                 0.1.0
[  ] rabbitmq_shovel                   3.6.2
[  ] rabbitmq_shovel_management        3.6.2
[  ] rabbitmq_stomp                    3.6.2
[  ] rabbitmq_tracing                  3.6.2
[e*] rabbitmq_web_dispatch             3.6.2
[E ] rabbitmq_web_mqtt                 
[  ] rabbitmq_web_stomp                3.6.2
[  ] rabbitmq_web_stomp_examples       3.6.2
[  ] sockjs                            0.3.4
[e*] webmachine                        1.10.3

I am not able to connect using paho JS client. So what is causing the issue? I tried port 1888, 8083,8883,15675

Error report when connecting to rabbitmq-server using mqtt protocol

=INFO REPORT==== 16-Jul-2013::10:06:14 ===
accepting MQTT connection (127.0.0.1:59336 -> 127.0.0.1:1883)

=ERROR REPORT==== 16-Jul-2013::10:06:14 ===
connection <0.15181.0>, channel 1 - soft error:
{amqp_error,not_found,
"no queue 'mqtt-subscription-PHP MQTT Clientqos1' in vhost '/'",
'queue.delete'}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.