shamblett / mqtt_client Goto Github PK
View Code? Open in Web Editor NEWA server and browser based MQTT client for dart
License: Other
A server and browser based MQTT client for dart
License: Other
Hello
I'm trying example for Flutter, but after press disconnect, it's not close connection and still get messages:
log:
3 21:20:16.919794 -- MqttConnection::_onData
I/flutter ( 4612): 2018-12-03 21:20:16.921229 -- MqttConnection::_onData - message received MQTTMessage of type MqttMessageType.publish
I/flutter ( 4612): Header: MessageType = MqttMessageType.publish, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 105
I/flutter ( 4612): Publish Variable Header: TopicName={/act}, MessageIdentifier={0}, VH Length={102}
I/flutter ( 4612): Payload: {3 bytes={<49><48><48>
I/flutter ( 4612): 2018-12-03 21:20:16.921281 -- MqttConnection::_onData - message not processed, disconnecting
I/flutter ( 4612): 2018-12-03 21:20:16.923504 -- MqttConnection::_onData
I/flutter ( 4612): 2018-12-03 21:20:16.923615 -- MqttConnection::_onData - message received MQTTMessage of type MqttMessageType.publish
I/flutter ( 4612): Header: MessageType = MqttMessageType.publish, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 106
I/flutter ( 4612): Publish Variable Header: TopicName={/act/on}, MessageIdentifier={0}, VH Length={105}
I/flutter ( 4612): Payload: {1 bytes={<49>
I/flutter ( 4612): 2018-12-03 21:20:16.923634 -- MqttConnection::_onData - message not processed, disconnecting
I/flutter ( 4612): 2018-12-03 21:20:16.924967 -- MqttConnection::_onData
I/flutter ( 4612): 2018-12-03 21:20:16.925069 -- MqttConnection::_onData - message received MQTTMessage of type MqttMessageType.publish
I/flutter ( 4612): Header: MessageType = MqttMessageType.publish, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 105
Wireshark confirm that traffic still loads
Connecting to a local mosquitto broker takes 5s. I wonder what takes so long..
final MqttClient client = new MqttClient("localhost", "");
I just replace the line of example, but connection refused. mqtt server is EMQX
I/flutter ( 1605): EXAMPLE::client exception - SocketException: OS Error: Connection refused, errno = 111, address = localhost, port = 53217
I/flutter ( 1605): EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, state is ConnectionState.disconnected
I/flutter ( 1605): EXAMPLE::client exception - SocketException: OS Error: Connection refused, errno = 111, address = localhost, port = 53218
I/flutter ( 1605): EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, state is ConnectionState.disconnected
Currently the client allows subscriptions to wildcard topics(my/topic/#) and receives publications for these , e.g we would get my/topic/fred, mytopic/bill etc however on reception these are not propagated through the client as /my/topic/fred doesn't match my/topic/# exactly. The client has code to ascertain whether an incoming publish topic does in fact match a subscription topic taking into account wildcards and should use this, not an exact match.
For compatibility with existing behaviour this should be selectable on the client when implemented.
When I connect to the server with authenticating as the wrong password.
I want to know the real reason about the disconnected.
The source as below from SynchronousMqttConnectionHandler.dart that hidden the connect ack message from the server.
Could you provide a new connect method to return a model include all the information from the server, not the connectionState only?
bool _connectAckProcessor(MqttMessage msg) {
MqttLogger.log("SynchronousMqttConnectionHandler::_connectAckProcessor");
try {
final MqttConnectAckMessage ackMsg = msg as MqttConnectAckMessage;
// Drop the connection if our connect request has been rejected.
if (ackMsg.variableHeader.returnCode ==
MqttConnectReturnCode.brokerUnavailable ||
ackMsg.variableHeader.returnCode ==
MqttConnectReturnCode.identifierRejected ||
ackMsg.variableHeader.returnCode ==
MqttConnectReturnCode.unacceptedProtocolVersion ||
**ackMsg.variableHeader.returnCode ==
MqttConnectReturnCode.notAuthorized ||**
ackMsg.variableHeader.returnCode ==
MqttConnectReturnCode.badUsernameOrPassword) {
MqttLogger.log(
"SynchronousMqttConnectionHandler::_connectAckProcessor connection rejected");
_performConnectionDisconnect();
} else {
// Initialize the keepalive to start the ping based keepalive process.
MqttLogger.log(
"SynchronousMqttConnectionHandler::_connectAckProcessor - state = connected");
connectionState = ConnectionState.connected;
}
} catch (InvalidMessageException) {
_performConnectionDisconnect();
}
// Cancel the connect timer;
MqttLogger.log(
"SynchronousMqttConnectionHandler:: cancelling connect timer");
_connectTimer.cancel();
return true;
onConnect callback feature is required when the client gets disconnected when there is network loss. This callback can be used to renew subscriptions.
Hi,
It should be possible (and probably even the default, or the only way ;-)) to have one observable for all subscriptions, so that all received messages are processed (once) via that single observable. Now overlapping subscriptions (due to wildcards, for example) may cause the message to notify multiple observables. Furthermore, due to the topic passed to the observable, there is no need to have more than one observable and it is for most applications a waste of resources (although I admit there might be use cases for it).
None of the MQTT implementations I know do "copy" received messages matching multiple subscription topics. This is against the concept of how MQTT subscription topics work and how a broker deals with them (it combines all subscriptions), so IMHO it's also bad client application behavior to rely on such a feature.
In case you want to dynamically (un)subscribe and potentially have overlapping topics, you have to filter out the message duplicates now. A workaround could be to exit the loop in publishMessageReceived()
after the first match (set by an option, like notifyOnlyOnce
), assuming all observables share the same listen-function, but this is not very elegant.
Here is my code:
void connect() async {
final String signedV4Query = 'X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJHBZ22PHAWZWW6MA%2F20180907%2Fap-southeast-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20180907T042308Z&X-Amz-Expires=86400&X-Amz-SignedHeaders=host&X-Amz-Signature=69b39751dd11dee1102d7f89997f0a747396bb7989c398cea774961454ee5eb8';
final MqttClient client = new MqttClient("wss://azg8fnwzpxzp0.iot.ap-southeast-1.amazonaws.com/mqtt?$signedV4Query", "");
client.useWebSocket = true;
client.port = 443;
client.keepAlivePeriod = 30;
client.secure = true;
client.logging(true);
//client.setProtocolV311(); //Tried to uncomment same result
client.onDisconnected = () {
debugPrint("DISCONNECTED!");
};
try {
debugPrint('CONNECTING...');
await client.connect();
debugPrint('CONNECTED!');
} catch(err) {
debugPrint("ERROR! $err");
client.disconnect();
}
}
this what i see in the log
I/flutter (27959): CONNECTING...
I/flutter (27959): 2018-09-07 16:45:53.100177 -- SynchronousMqttConnectionHandler::internalConnect entered
I/flutter (27959): 2018-09-07 16:45:53.100433 -- SynchronousMqttConnectionHandler::internalConnect - initiating connection try 0
I/flutter (27959): 2018-09-07 16:45:53.100563 -- SynchronousMqttConnectionHandler::internalConnect - secure selected
I/flutter (27959): 2018-09-07 16:45:53.100703 -- MqttSecureConnection::connect
And then nothing happen is just freeze forever.
I use latest version 3.1.0 and flutter 2.0.0-dev.68.0
Hello there,
I just download the repo to test the flutter example, it just dose not seem to work, many error my ide detected, Undefined class mqtt.ConnectionState, Too many positional arguments every where, can not even get it start to debug
.
Off course enabled dart '2', and got all dependencies.
Thank you.
The client should allow the status of the retained flag to be specified when a message is published. Other flags should also be checked here for applicability.
The API should have a callback on disconnection of the broker to allow user code to implement whatever clean up it needs to do.
Reconnecting 4 times to a broker results in messages being received 4 times.
import 'dart:async';
import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
Future<int> main() async {
MqttClient client;
void _connect() async {
client = MqttClient("test.mosquitto.org", "");
client.logging(false);
client.onDisconnected = onDisconnected;
final MqttConnectMessage connMess = MqttConnectMessage()
.withClientIdentifier("Mqtt_MyClientUniqueId")
.startClean();
print("EXAMPLE::Mosquitto client connecting....");
client.connectionMessage = connMess;
try {
await client.connect();
} catch (e) {
print("EXAMPLE::client exception - $e");
client.disconnect();
}
if (client.connectionState == ConnectionState.connected) {
print("EXAMPLE::Mosquitto client connected");
} else {
print(
"EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, state is ${client.connectionState}");
client.disconnect();
exit(-1);
}
}
_connect();
await MqttUtilities.asyncSleep(1);
print("Disconnecting");
client.disconnect();
print("Connection state: ${client.connectionState}");
// Probably not needed but explicitly allow garbage collection
client = null;
await MqttUtilities.asyncSleep(1);
_connect();
await MqttUtilities.asyncSleep(1);
print("Disconnecting");
client.disconnect();
print("Connection state: ${client.connectionState}");
client = null;
await MqttUtilities.asyncSleep(1);
_connect();
await MqttUtilities.asyncSleep(1);
print("Disconnecting");
client.disconnect();
print("Connection state: ${client.connectionState}");
client = null;
await MqttUtilities.asyncSleep(1);
_connect();
await MqttUtilities.asyncSleep(1);
print("Connection state: ${client.connectionState}");
client.updates.listen((List<MqttReceivedMessage> c) {
final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;
final String pt =
MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
print(
"EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- ${pt} -->");
print("");
});
print("EXAMPLE::Publishing our topic");
final String pubTopic = "Dart/Mqtt_client/testtopic";
final MqttClientPayloadBuilder builder = MqttClientPayloadBuilder();
builder.addString("Hello from mqtt_client");
client.subscribe(pubTopic, MqttQos.exactlyOnce);
client.publishMessage(pubTopic, MqttQos.exactlyOnce, builder.payload);
print("EXAMPLE::Sleeping....");
await MqttUtilities.asyncSleep(120);
client.disconnect();
return 0;
}
void onDisconnected() {
print("EXAMPLE::OnDisconnected client callback - Client disconnection");
exit(-1);
}
EXAMPLE::Mosquitto client connecting....
EXAMPLE::Mosquitto client connected
Disconnecting
Connection state: ConnectionState.disconnected
EXAMPLE::Mosquitto client connecting....
EXAMPLE::Mosquitto client connected
Disconnecting
Connection state: ConnectionState.disconnected
EXAMPLE::Mosquitto client connecting....
EXAMPLE::Mosquitto client connected
Disconnecting
Connection state: ConnectionState.disconnected
EXAMPLE::Mosquitto client connecting....
EXAMPLE::Mosquitto client connected
Connection state: ConnectionState.connected
EXAMPLE::Publishing our topic
EXAMPLE::Sleeping....
StreamSink is closed and adding to it is an error.
See http://dartbug.com/29554.
#0 _StreamSinkImpl._reportClosedSink (dart:io/io_sink.dart:160:31)
#1 _StreamSinkImpl.add (dart:io/io_sink.dart:165:7)
#2 _Socket.add (dart:io/runtime/binsocket_patch.dart:1607:38)
#3 MqttConnection.send (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection.dart:125:12)
#4 MqttConnectionHandler.sendMessage (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:72:18)
#5 PublishingManager.handlePublish (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/mqtt_client_publishing_manager.dart:139:27)
#6 MqttConnectionHandler.messageAvailable (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:117:13)
#7 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#8 CastStreamSubscription._onData (dart:_internal/async_cast.dart:81:11)
#9 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#10 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#11 _BufferingStreamSubscription._add (dart:async/stream_impl.dart:263:7)
#12 _ForwardingStreamSubscription._add (dart:async/stream_pipe.dart:132:11)
#13 _WhereStream._handleData (dart:async/stream_pipe.dart:207:12)
#14 _ForwardingStreamSubscription._handleData (dart:async/stream_pipe.dart:164:13)
#15 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#16 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#17 _DelayedData.perform (dart:async/stream_impl.dart:584:14)
#18 _StreamImplEvents.handleNext (dart:async/stream_impl.dart:700:11)
#19 _PendingEvents.schedule.<anonymous closure> (dart:async/stream_impl.dart:660:7)
#20 _microtaskLoop (dart:async/schedule_microtask.dart:41:21)
#21 _startMicrotaskLoop (dart:async/schedule_microtask.dart:50:5)
#22 _runPendingImmediateCallback (dart:isolate/runtime/libisolate_patch.dart:113:13)
#23 _RawReceivePortImpl._handleMessage (dart:isolate/runtime/libisolate_patch.dart:166:5)
StreamSink is closed and adding to it is an error.
See http://dartbug.com/29554.
#0 _StreamSinkImpl._reportClosedSink (dart:io/io_sink.dart:160:31)
#1 _StreamSinkImpl.add (dart:io/io_sink.dart:165:7)
#2 _Socket.add (dart:io/runtime/binsocket_patch.dart:1607:38)
#3 MqttConnection.send (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection.dart:125:12)
#4 MqttConnectionHandler.sendMessage (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:72:18)
#5 PublishingManager.handlePublish (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/mqtt_client_publishing_manager.dart:139:27)
#6 MqttConnectionHandler.messageAvailable (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:117:13)
#7 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#8 CastStreamSubscription._onData (dart:_internal/async_cast.dart:81:11)
#9 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#10 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#11 _BufferingStreamSubscription._add (dart:async/stream_impl.dart:263:7)
#12 _ForwardingStreamSubscription._add (dart:async/stream_pipe.dart:132:11)
#13 _WhereStream._handleData (dart:async/stream_pipe.dart:207:12)
#14 _ForwardingStreamSubscription._handleData (dart:async/stream_pipe.dart:164:13)
#15 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#16 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#17 _DelayedData.perform (dart:async/stream_impl.dart:584:14)
#18 _StreamImplEvents.handleNext (dart:async/stream_impl.dart:700:11)
#19 _PendingEvents.schedule.<anonymous closure> (dart:async/stream_impl.dart:660:7)
#20 _microtaskLoop (dart:async/schedule_microtask.dart:41:21)
#21 _startMicrotaskLoop (dart:async/schedule_microtask.dart:50:5)
#22 _runPendingImmediateCallback (dart:isolate/runtime/libisolate_patch.dart:113:13)
#23 _RawReceivePortImpl._handleMessage (dart:isolate/runtime/libisolate_patch.dart:166:5)
StreamSink is closed and adding to it is an error.
See http://dartbug.com/29554.
#0 _StreamSinkImpl._reportClosedSink (dart:io/io_sink.dart:160:31)
#1 _StreamSinkImpl.add (dart:io/io_sink.dart:165:7)
#2 _Socket.add (dart:io/runtime/binsocket_patch.dart:1607:38)
#3 MqttConnection.send (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection.dart:125:12)
#4 MqttConnectionHandler.sendMessage (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:72:18)
#5 PublishingManager.handlePublish (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/mqtt_client_publishing_manager.dart:139:27)
#6 MqttConnectionHandler.messageAvailable (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:117:13)
#7 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#8 CastStreamSubscription._onData (dart:_internal/async_cast.dart:81:11)
#9 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#10 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#11 _BufferingStreamSubscription._add (dart:async/stream_impl.dart:263:7)
#12 _ForwardingStreamSubscription._add (dart:async/stream_pipe.dart:132:11)
#13 _WhereStream._handleData (dart:async/stream_pipe.dart:207:12)
#14 _ForwardingStreamSubscription._handleData (dart:async/stream_pipe.dart:164:13)
#15 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#16 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#17 _DelayedData.perform (dart:async/stream_impl.dart:584:14)
#18 _StreamImplEvents.handleNext (dart:async/stream_impl.dart:700:11)
#19 _PendingEvents.schedule.<anonymous closure> (dart:async/stream_impl.dart:660:7)
#20 _microtaskLoop (dart:async/schedule_microtask.dart:41:21)
#21 _startMicrotaskLoop (dart:async/schedule_microtask.dart:50:5)
#22 _runPendingImmediateCallback (dart:isolate/runtime/libisolate_patch.dart:113:13)
#23 _RawReceivePortImpl._handleMessage (dart:isolate/runtime/libisolate_patch.dart:166:5)
StreamSink is closed and adding to it is an error.
See http://dartbug.com/29554.
#0 _StreamSinkImpl._reportClosedSink (dart:io/io_sink.dart:160:31)
#1 _StreamSinkImpl.add (dart:io/io_sink.dart:165:7)
#2 _Socket.add (dart:io/runtime/binsocket_patch.dart:1607:38)
#3 MqttConnection.send (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection.dart:125:12)
#4 MqttConnectionHandler.sendMessage (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:72:18)
#5 PublishingManager.handlePublishRelease (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/mqtt_client_publishing_manager.dart:162:27)
#6 MqttConnectionHandler.messageAvailable (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:117:13)
#7 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#8 CastStreamSubscription._onData (dart:_internal/async_cast.dart:81:11)
#9 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#10 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#11 _BufferingStreamSubscription._add (dart:async/stream_impl.dart:263:7)
#12 _ForwardingStreamSubscription._add (dart:async/stream_pipe.dart:132:11)
#13 _WhereStream._handleData (dart:async/stream_pipe.dart:207:12)
#14 _ForwardingStreamSubscription._handleData (dart:async/stream_pipe.dart:164:13)
#15 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#16 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#17 _DelayedData.perform (dart:async/stream_impl.dart:584:14)
#18 _StreamImplEvents.handleNext (dart:async/stream_impl.dart:700:11)
#19 _PendingEvents.schedule.<anonymous closure> (dart:async/stream_impl.dart:660:7)
#20 _microtaskLoop (dart:async/schedule_microtask.dart:41:21)
#21 _startMicrotaskLoop (dart:async/schedule_microtask.dart:50:5)
#22 _runPendingImmediateCallback (dart:isolate/runtime/libisolate_patch.dart:113:13)
#23 _RawReceivePortImpl._handleMessage (dart:isolate/runtime/libisolate_patch.dart:166:5)
StreamSink is closed and adding to it is an error.
See http://dartbug.com/29554.
#0 _StreamSinkImpl._reportClosedSink (dart:io/io_sink.dart:160:31)
#1 _StreamSinkImpl.add (dart:io/io_sink.dart:165:7)
#2 _Socket.add (dart:io/runtime/binsocket_patch.dart:1607:38)
#3 MqttConnection.send (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection.dart:125:12)
#4 MqttConnectionHandler.sendMessage (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:72:18)
#5 PublishingManager.handlePublishRelease (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/mqtt_client_publishing_manager.dart:162:27)
#6 MqttConnectionHandler.messageAvailable (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:117:13)
#7 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#8 CastStreamSubscription._onData (dart:_internal/async_cast.dart:81:11)
#9 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#10 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#11 _BufferingStreamSubscription._add (dart:async/stream_impl.dart:263:7)
#12 _ForwardingStreamSubscription._add (dart:async/stream_pipe.dart:132:11)
#13 _WhereStream._handleData (dart:async/stream_pipe.dart:207:12)
#14 _ForwardingStreamSubscription._handleData (dart:async/stream_pipe.dart:164:13)
#15 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#16 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#17 _DelayedData.perform (dart:async/stream_impl.dart:584:14)
#18 _StreamImplEvents.handleNext (dart:async/stream_impl.dart:700:11)
#19 _PendingEvents.schedule.<anonymous closure> (dart:async/stream_impl.dart:660:7)
#20 _microtaskLoop (dart:async/schedule_microtask.dart:41:21)
#21 _startMicrotaskLoop (dart:async/schedule_microtask.dart:50:5)
#22 _runPendingImmediateCallback (dart:isolate/runtime/libisolate_patch.dart:113:13)
#23 _RawReceivePortImpl._handleMessage (dart:isolate/runtime/libisolate_patch.dart:166:5)
StreamSink is closed and adding to it is an error.
See http://dartbug.com/29554.
#0 _StreamSinkImpl._reportClosedSink (dart:io/io_sink.dart:160:31)
#1 _StreamSinkImpl.add (dart:io/io_sink.dart:165:7)
#2 _Socket.add (dart:io/runtime/binsocket_patch.dart:1607:38)
#3 MqttConnection.send (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection.dart:125:12)
#4 MqttConnectionHandler.sendMessage (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:72:18)
#5 PublishingManager.handlePublishRelease (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/mqtt_client_publishing_manager.dart:162:27)
#6 MqttConnectionHandler.messageAvailable (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.3.2/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:117:13)
#7 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#8 CastStreamSubscription._onData (dart:_internal/async_cast.dart:81:11)
#9 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#10 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#11 _BufferingStreamSubscription._add (dart:async/stream_impl.dart:263:7)
#12 _ForwardingStreamSubscription._add (dart:async/stream_pipe.dart:132:11)
#13 _WhereStream._handleData (dart:async/stream_pipe.dart:207:12)
#14 _ForwardingStreamSubscription._handleData (dart:async/stream_pipe.dart:164:13)
#15 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
#16 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#17 _DelayedData.perform (dart:async/stream_impl.dart:584:14)
#18 _StreamImplEvents.handleNext (dart:async/stream_impl.dart:700:11)
#19 _PendingEvents.schedule.<anonymous closure> (dart:async/stream_impl.dart:660:7)
#20 _microtaskLoop (dart:async/schedule_microtask.dart:41:21)
#21 _startMicrotaskLoop (dart:async/schedule_microtask.dart:50:5)
#22 _runPendingImmediateCallback (dart:isolate/runtime/libisolate_patch.dart:113:13)
#23 _RawReceivePortImpl._handleMessage (dart:isolate/runtime/libisolate_patch.dart:166:5)
EXAMPLE::Change notification:: topic is <Dart/Mqtt_client/testtopic>, payload is <-- Hello from mqtt_client -->
EXAMPLE::Change notification:: topic is <Dart/Mqtt_client/testtopic>, payload is <-- Hello from mqtt_client -->
EXAMPLE::Change notification:: topic is <Dart/Mqtt_client/testtopic>, payload is <-- Hello from mqtt_client -->
EXAMPLE::Change notification:: topic is <Dart/Mqtt_client/testtopic>, payload is <-- Hello from mqtt_client -->
There is something wrong in the socket code because I can see two variants of "pauses". Here is a log from a little test program:
INFO: 2018-06-26 11:01:27.275563: Connecting to localhost
INFO: 2018-06-26 11:01:32.316929: Connected
INFO: 2018-06-26 11:01:32.317333: Connected
INFO: 2018-06-26 11:01:32.317969: Subscribing to #
INFO: 2018-06-26 11:01:32.354016: idioter: hey2
INFO: 2018-06-26 11:01:52.343208: mulor: hey3
We can see that it takes exactly 5 seconds to connect. We can also see it takes exactly 20 seconds between "hey2" and "hey3" - both are retained messages on two different topics. Neither of these pauses should be here, using mosquitto_sun for example, all happens instantly.
Exactly the same behavior for Mosquitto or VerneMQ, I tested both to be sure.
Is it possible to get triggered when a ckient.subscribe()
has been acknowledged by the broker? If yes, how? If not, we need to implement it ;-). In the code I see the pendingSubscriptions being updated and I see I can explicitly ask for the subscription status, but that's not sufficient.
I have some hours/days trying to implement the library in flutter, but I have not a success.
Always I Stuck at the same point.
In the log, these are the uniques printed lines that I can get with the logging flag activated:
MQTT client connecting....
2018-10-09 13:20:01.053384 -- SynchronousMqttConnectionHandler::internalConnect entered
2018-10-09 13:20:01.060346 -- SynchronousMqttConnectionHandler::internalConnect - initiating connection try 0
2018-10-09 13:20:01.060677 -- SynchronousMqttConnectionHandler::internalConnect - insecure TCP selected
2018-10-09 13:20:02.525680 -- MqttConnection::_startListening
2018-10-09 13:21:32.490306 -- MqttConnection::_onDone - calling disconnected callback
Even if I copy/paste the code of the flutter example, does not work the implementation.
I'm just can run the flutter example, and it works fine even with my broker.
is there another specific thing that I need to implement and maybe I'm ignoring?
BTW, I'm using the same code lines of the _connect() method of the main.dart class of the Flutter Example.
I was looking for Flutter example for this package, but couldn't find one.
I don't know if this repo supports Flutter or not. At pub.dartlang.org it used to show flutter support but now it is unrecognised.
This isn't an issue with the package itself, just something I noticed in the example:
buff[0] = 'h'.codeUnitAt(0);
buff[1] = 'e'.codeUnitAt(0);
buff[2] = 'l'.codeUnitAt(0);
buff[3] = 'l'.codeUnitAt(0);
buff[4] = 'o'.codeUnitAt(0);
The Dart team actually has a package out called charcode
that contains character constants. So instead of having to call codeUnitAt
...
buff[0] = $h;
buff[1] = $e;
// Or...
buff.addAll([$h, $e, $l, $l, $o]);
Hopefully this was helpful...
https://pub.dartlang.org/packages/charcode
import 'package:mqtt_client/mqtt_client.dart';
Future<int> main() async {
final MqttClient client = MqttClient('test.mosquitto.org', '');
// final MqttClient client = MqttClient('127.0.0.1', '');
client.logging(on: false);
client.keepAlivePeriod = 60;
client.onDisconnected = onDisconnected;
client.onSubscribed = onSubscribed;
final MqttConnectMessage connMess = MqttConnectMessage()
.withClientIdentifier('Mqtt_MyClientUniqueId')
.keepAliveFor(60) // Must agree with the keep alive set above or not set
.withWillTopic('willtopic') // If you set this you must set a will message
.withWillMessage('My Will message')
.startClean() // Non persistent session for testing
.withWillQos(MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
client.connectionMessage = connMess;
try {
await client.connect();
} on Exception catch (e) {
print('EXAMPLE::client exception - $e');
client.disconnect();
}
/// Check we are connected
if (client.connectionStatus.state == ConnectionState.connected) {
print('EXAMPLE::Mosquitto client connected');
} else {
/// Use status here rather than state if you also want the broker return code.
print(
'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, status is ${client.connectionStatus}');
client.disconnect();
}
const String topic = 'com/spl/mqtt/test';
client.subscribe(topic, MqttQos.atLeastOnce);
client.updates.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final MqttPublishMessage recMess = c[0].payload;
final String pt =
MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
print(
'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
print('');
});
final MqttClientPayloadBuilder builder = MqttClientPayloadBuilder();
builder.addString('Hello from spl');
client.subscribe(topic, MqttQos.atLeastOnce);
client.publishMessage(topic, MqttQos.exactlyOnce, builder.payload);
print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(20);
print('EXAMPLE::Unsubscribing');
client.unsubscribe(topic);
await MqttUtilities.asyncSleep(20);
/// subscribe again
print('EXAMPLE::subscribe again....');
client.subscribe(topic, MqttQos.atMostOnce);
final MqttClientPayloadBuilder builderM = MqttClientPayloadBuilder();
builderM.addString('Hello from spl again');
client.subscribe(topic, MqttQos.exactlyOnce);
client.publishMessage(topic, MqttQos.exactlyOnce, builder.payload);
print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(10);
print('EXAMPLE::Disconnecting');
client.disconnect();
return 0;
}
/// The subscribed callback
void onSubscribed(String topic) {
print('EXAMPLE::Subscription confirmed for topic $topic');
}
/// The unsolicited disconnect callback
void onDisconnected() {
print('EXAMPLE::OnDisconnected client callback - Client disconnection');
}
EXAMPLE::Mosquitto client connecting....
EXAMPLE::Mosquitto client connected
EXAMPLE::Sleeping....
EXAMPLE::Subscription confirmed for topic com/spl/mqtt/test
EXAMPLE::Change notification:: topic is <com/spl/mqtt/test>, payload is <-- Hello from spl -->
EXAMPLE::Unsubscribing
EXAMPLE::subscribe again....
EXAMPLE::Sleeping....
EXAMPLE::Disconnecting
From mobile applications and in general, using MQTT over WSS is ideal. It works well through firewalls and so on. In Canoe (getcanoe.io) which is a crypto wallet we use this to communicate effortlessly from the app, and it even works great from inside China. Is support for WSS hard to add? Has it been considered?
So I get this error when trying to run the example as a flutter (0.5.1) app. Any ideas on what could be wrong?
I/flutter (18829): EXAMPLE::Mosquitto client connecting....
E/flutter (18829): [ERROR:topaz/lib/tonic/logging/dart_error.cc(16)] Unhandled exception:
E/flutter (18829): NoSuchMethodError: The setter 'disconnectRequested=' was called on null.
E/flutter (18829): Receiver: null
E/flutter (18829): Tried calling: disconnectRequested=true
E/flutter (18829): #0 Object.noSuchMethod (dart:core/runtime/libobject_patch.dart:46:5)
E/flutter (18829): #1 SynchronousMqttConnectionHandler._performConnectionDisconnect (file:///C:/Program%20Files%20(other)/flutter/.pub-cache/hosted/pub.dartlang.org/mqtt_client-1.9.0/lib/src/connectionhandling/mqtt_client_synchronous_mqtt_connection_handler.dart:83:16)
E/flutter (18829): #2 SynchronousMqttConnectionHandler.disconnect (file:///C:/Program%20Files%20(other)/flutter/.pub-cache/hosted/pub.dartlang.org/mqtt_client-1.9.0/lib/src/connectionhandling/mqtt_client_synchronous_mqtt_connection_handler.dart:76:5)
E/flutter (18829): #3 MqttClient.disconnect (file:///C:/Program%20Files%20(other)/flutter/.pub-cache/hosted/pub.dartlang.org/mqtt_client-1.9.0/lib/src/mqtt_client.dart:182:25)
E/flutter (18829): #4 main (file:///C:/Users/vince/AndroidStudioProjects/mqtt_test/lib/main.dart:63:12)
E/flutter (18829):
E/flutter (18829): #5 _startIsolate. (dart:isolate/runtime/libisolate_patch.dart:279:19)
E/flutter (18829): #6 _RawReceivePortImpl._handleMessage (dart:isolate/runtime/libisolate_patch.dart:165:12)
Sending 10000 bytes to mqtt_client running on flutter/android (With some rethrow
put in the code to catch the real exception):
I/flutter ( 1319): Exception: mqtt_client::ByteBuffer: The buffer did not have enough bytes for the read operation length 7200, count 10000, position 14
I/flutter ( 1319): Caught error: Exception: mqtt_client::ByteBuffer: The buffer did not have enough bytes for the read operation length 7200, count 10000, position 14
I/flutter ( 1319): #0 MqttByteBuffer.read (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.1.0/lib/src/utility/mqtt_client_byte_buffer.dart:69:7)
I/flutter ( 1319): #1 MqttPublishPayload.readFrom (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.1.0/lib/src/messages/publish/mqtt_client_mqtt_publish_payload.dart:37:29)
I/flutter ( 1319): #2 new MqttPublishPayload.fromByteBuffer (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.1.0/lib/src/messages/publish/mqtt_client_mqtt_publish_payload.dart:28:5)
I/flutter ( 1319): #3 MqttPublishMessage.readFrom (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.1.0/lib/src/messages/publish/mqtt_client_mqtt_publish_message.dart:37:39)
I/flutter ( 1319): #4 new MqttPublishMessage.fromByteBuffer (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.1.0/lib/src/messages/publish/mqtt_client_mqtt_publish_message.dart:29:5)
I/flutter ( 1319): #5 MqttMessageFactory.getMessage (file:///Users/ben/.pub-cache/hosted/pub.dartlang.org/mqtt_client-3.1.0/lib/src/messages/mqtt_client_mqt
_data
is 7200 bytes long:
On macOS this works without issues.
The number of secure working parameters has grown since first envisaged, it may now be less cumbersome to collect these into a separate class and pass this to the client for secure working.
According to the MQTT v3.1 spec (http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf, page 27) the QoS field in the fixed header of the SUBSCRIBE command should be set to b01.
Version 3.1.1 (http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.pdf, page 40) seems to be even stricter
about the SUBSCRIBE header:
Bits 3,2,1 and 0 of the fixed header
of the SUBSCRIBE Control Packet
are reserved and MUST be set to
0,0,1 and 0 respectively.
The Server MUST treat any other value as malformed and close the Network Connection
Stopping mosquitto server causes the following, I guess we should do better (I am still Dart noob).
StreamSink is closed and adding to it is an error.
See http://dartbug.com/29554.
#0 _StreamSinkImpl._reportClosedSink (dart:io/io_sink.dart:160)
#1 _StreamSinkImpl.add (dart:io/io_sink.dart:165)
#2 _Socket.add (dart:io-patch/socket_patch.dart:1565)
#3 MqttConnection.send (package:mqtt_client/src/connectionhandling/mqtt_client_mqtt_connection.dart:109:12)
#4 MqttConnectionHandler.sendMessage (package:mqtt_client/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:73:18)
#5 MqttConnectionKeepAlive.pingRequired (package:mqtt_client/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart:52:26)
#6 Timer._createTimer.<anonymous closure> (dart:async-patch/dart:async/timer_patch.dart:21)
#7 _Timer._runTimers (dart:isolate-patch/dart:isolate/timer_impl.dart:382)
#8 _Timer._handleMessage (dart:isolate-patch/dart:isolate/timer_impl.dart:416)
#9 _RawReceivePortImpl._handleMessage (dart:isolate-patch/dart:isolate/isolate_patch.dart:165)
I'm wondering if there is a good reason to use the Observable package, instead of providing a "plain" Dart Stream.
In case of using mqtt_client in complex programs (e.g. Flutter with in my case also rxdart, having its own Observable class), it's less easy to integrate (mixing up objects from several external packages).
when i'm use mqtt_client in flutter 0.4.4 beta version,and when call connect method, and there some errors:
flutter: type '(MessageReceived) => void' is not a subtype of type '(dynamic) => void'
Evaluating the mqtt_client, getting an exception in connect():
Authenticating with username '...' and password '...'
Username length (14) exceeds the max recommended in the MQTT spec.
Password length (18) exceeds the max recommended in the MQTT spec.
2017-10-31 04:05:17.525056 -- MqttConnectPayload::Client id exceeds spec value of 23
2017-10-31 04:05:17.571400 -- SynchronousMqttConnectionHandler::internalConnect entered
2017-10-31 04:05:17.572771 -- SynchronousMqttConnectionHandler::internalConnect - initiating connection try 0
2017-10-31 04:05:17.574705 -- SynchronousMqttConnectionHandler::internalConnect - insecure TCP selected
2017-10-31 04:05:17.739883 -- MqttConnection::_startListening
2017-10-31 04:05:17.756835 -- SynchronousMqttConnectionHandler::internalConnect sending connect message
2017-10-31 04:05:17.776349 -- MqttConnectionHandler::sendMessage - MQTTMessage of type MqttMessageType.connect
Header: MessageType = MqttMessageType.connect, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 0
Connect Variable Header: ProtocolName=MQTT, ProtocolVersion=4, ConnectFlags=Connect Flags: Reserved1=false, CleanStart=true, WillFlag=false, WillQos=MqttQos.atLeastOnce, WillRetain=false, PasswordFlag=true, UserNameFlag=true, KeepAlive=60
Instance of 'MqttConnectPayload'
2017-10-31 04:05:17.876106 -- SynchronousMqttConnectionHandler::internalConnect - pre sleep, state = ConnectionState.connecting
Unhandled exception:
SocketException: MqttConnection::On Done called by broker, disconnecting.
#0 MqttConnection._onDone (package:mqtt_client/src/connectionhandling/mqtt_client_mqtt_connection.dart:88:7)
#1 _RootZone.runGuarded (dart:async/zone.dart:1296)
#2 _BufferingStreamSubscription._sendDone.sendDone (dart:async/stream_impl.dart:384)
#3 _BufferingStreamSubscription._sendDone (dart:async/stream_impl.dart:394)
#4 _BufferingStreamSubscription._close (dart:async/stream_impl.dart:277)
#5 _StreamController&&_SyncStreamControllerDispatch._sendDone (dart:async/stream_controller.dart:804)
#6 _StreamController._closeUnchecked (dart:async/stream_controller.dart:656)
#7 _StreamController.close (dart:async/stream_controller.dart:649)
#8 _Socket._onData (dart:io-patch/socket_patch.dart:1624)
#9 _RootZone.runUnaryGuarded (dart:async/zone.dart:1307)
#10 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:330)
#11 _BufferingStreamSubscription._add (dart:async/stream_impl.dart:257)
#12 _StreamController&&_SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:796)
#13 _StreamController._add (dart:async/stream_controller.dart:667)
#14 _StreamController.add (dart:async/stream_controller.dart:613)
#15 new _RawSocket. (dart:io-patch/socket_patch.dart:1210)
#16 _NativeSocket.issueReadEvent.issue (dart:io-patch/socket_patch.dart:753)
#17 _microtaskLoop (dart:async/schedule_microtask.dart:41)
#18 _startMicrotaskLoop (dart:async/schedule_microtask.dart:50)
#19 _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:99)
#20 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:152)
../../runtime/vm/dart_api_impl.cc: 1878: error: Dart_GetMainPortId expects there to be a current isolate. Did you forget to call Dart_CreateIsolate or Dart_EnterIsolate?
Aborted
Unfortunately, I get the following error running the example code:
E/flutter (16017): [ERROR:topaz/lib/tonic/logging/dart_error.cc(16)] Unhandled exception:
E/flutter (16017): type 'List<ChangeRecord>' is not a subtype of type 'List<MqttReceivedMessage<dynamic>>'
E/flutter (16017): #0 ChangeNotifier.deliverChanges (package:observable/src/change_notifier.dart:63:20)
E/flutter (16017): #1 _microtaskLoop (dart:async/schedule_microtask.dart:41:21)
E/flutter (16017): #2 _startMicrotaskLoop (dart:async/schedule_microtask.dart:50:5)
It seems, there is a problem regarding publish messages.
I'm using flutter v0.5.1.
The folliowing 3 (similar) warnings appear (Flutter dev 0.7.0, Dart 2.1.0-dev.1.0.flutter-69fce633b7, mqtt_client bd326b5):
[...]/lib/src/connectionhandling/mqtt_client_mqtt_normal_connection.dart:29:26: Warning: Can't return a value from a void function.
compiler message: return completer.complete();
compiler message: ^
[...]/lib/src/connectionhandling/mqtt_client_mqtt_ws_connection.dart:48:26: Warning: Can't return a value from a void function.
compiler message: return completer.complete();
compiler message: ^
[...]/lib/src/connectionhandling/mqtt_client_mqtt_secure_connection.dart:68:26: Warning: Can't return a value from a void function.
compiler message: return completer.complete();
compiler message: ^
The unhandled socket exception generated on broker disconnect should really be handled better, it maybe possible to do this using zones.
Hello! I am trying to make a flutter app that connects to the MQTT server of meeo.xyz. I verified that I am able to connect to it by setting the credentials using MQTT.fx
When I try with mqtt_client, I get this error:
flutter: 2018-11-27 10:29:52.135926 -- Authenticating with username '{md-hi75gqj}' and password '{user_K8SzwBbLqBEwfIqM}'
flutter: 2018-11-27 10:29:52.136213 -- Password length (21) exceeds the max recommended in the MQTT spec.
...
flutter: 2018-11-27 10:29:53.880479 -- SynchronousMqttConnectionHandler::_connectAckProcessor
flutter: 2018-11-27 10:29:53.881361 -- SynchronousMqttConnectionHandler::_connectAckProcessor connection rejected
flutter: 2018-11-27 10:29:53.882681 -- SynchronousMqttConnectionHandler:: cancelling connect timer
flutter: 2018-11-27 10:29:53.884647 -- SynchronousMqttConnectionHandler::internalConnect - post sleep, state = Connection status is disconnected with return code badUsernameOrPassword
flutter: 2018-11-27 10:29:53.884821 -- SynchronousMqttConnectionHandler::internalConnect failed
flutter: ERROR: mqtt-client::NoConnectionException: The maximum allowed connection attempts ({3}) were exceeded. The broker is not responding to the connection request message (Missing Connection Acknowledgement
flutter: ERROR: MQTT client connection failed - disconnecting, state is MqttConnectionState.faulted
Here's my code:
String broker = "mq.meeo.xyz";
String username = "md-hi75gqj";
String password = "some-password";
void _connect() async {
client = mqtt.MqttClient(broker, '');
client.logging(on: true);
client.onDisconnected = _onDisconnected;
....
final mqtt.MqttConnectMessage connMess = mqtt.MqttConnectMessage()
.withClientIdentifier('Mqtt_MyClientUniqueId')
.keepAliveFor(20) // Must agree with the keep alive set above or not set
.withWillTopic('willtopic') // If you set this you must set a will message
.withWillMessage('My Will message')
.startClean() // Non persistent session for testing
.withWillQos(mqtt.MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
client.connectionMessage = connMess;
...
try {
await client.connect(username, password);
} catch (e) {
print("ERROR: " + e.toString());
}
}
I don't get it as to why I am getting a badUsernameOrPassword error but it is fine in MQTT.fx. Am I missing something for the setup? I've tried setting the useWebsocket
as well but I am still having a problem connecting to the MQTT server.
Any help would be deeply appreciated
Hi, I'm trying to implement the AWS IOT connection with this plugin but the aws iot authorizes via private certificate and key. There is an option to provide them in normal dart usage but not working with flutter, im trying to add the files in assets where pubspec.yaml process the asset files , so how to give the path for the trusted certificate, certificate toolchain and private key in flutter context.
I know there is a way to avoid the certificate path and give the URL using the AWS pre-signed concept, but I need to know if it is possible without presign concept.
I have an app that is running in the background, but when you close the phone's screen by pressing the power button, it no longer sends the PINGREQ to the broker and disconnects.
How would I be able to keep the connection alive even when the phone's screen is turned off when my app is running in the background?
How to send a message contains utf8 characters?
Here is my connection code in flutter (mqtt_client: ^3.3.4):
Code:
...
_client = MqttClient(
"wss://${mqttServer}/mqtt",
clientId,
);
_client.port = 443;
_client.useWebSocket = true;
...
_client.connectionMessage = MqttConnectMessage()
.withWillQos(MqttQos.exactlyOnce)
.authenticateAs(id.toString(), token)
.withClientIdentifier(clientId);
...
await _client.connect();
The problem is, Sometimes without any reason, i get disconnected from server also when trying to publish a message.
MqttClientPayloadBuilder builder = MqttClientPayloadBuilder();
builder.addString(
json.encode(
{
"type": "msgText",
"data": msg.data,
"identifier": identifier,
},
),
);
_client.publishMessage(topic, MqttQos.exactlyOnce, builder.payload)
I'm sure this is a library issue cause i have no problem with paho.mqtt.
Edit:
It's disconnect exactly every minute.
I/flutter (23865): 2018-11-19 11:39:30.386998 -- MqttConnection::_onDone - calling disconnected callback
Hi,
I'm trying to make a secure connection to my broker, but I'm not being able to establish a connection to my server. Usually I need to set it up through my client certificate, my client key and the CA cert. Can you give me a direction on how can achieve it? Thank you!
Description:
I've got instantly disconnect when publishing any message.
Here is how i make a connection made:
client = MqttClient("wss://mylocalserver:443/mqtt", clientId);
client.setProtocolV311();
client.keepAlivePeriod = 6000;
client.port = 443;
client.useWebSocket = true;
client.logging(on: true);
client.onDisconnected = () {
print("==> Disconnected | Time: ${DateTime.now().toUtc()}");
client.disconnect();
};
client.connectionMessage = MqttConnectMessage()
.authenticateAs(id, token)
.withClientIdentifier(clientId);
client.connectionMessage.startClean();
await client.connect();
Message:
MqttClientPayloadBuilder builder = MqttClientPayloadBuilder();
builder.addString(
json.encode(
{
"type": "msgText",
"data": "message data",
"identifier": Random().nextInt(1000000),
},
),
);
client.publishMessage("u/$id", MqttQos.exactlyOnce, builder.payload);
Call stack to _onDone:
E/flutter (22195): #0 MqttConnection._onDone (file:///home/tje3d/.pub-cache/hosted/pub.dartlang.org/mqtt_client-4.0.0/lib/src/connectionhandling/mqtt_client_mqtt_connection.dart:112:5)
E/flutter (22195): #1 _RootZone.runGuarded (dart:async/zone.dart:1302:10)
E/flutter (22195): #2 _BufferingStreamSubscription._sendDone.sendDone (dart:async/stream_impl.dart:389:13)
E/flutter (22195): #3 _BufferingStreamSubscription._sendDone (dart:async/stream_impl.dart:399:15)
E/flutter (22195): #4 _BufferingStreamSubscription._close (dart:async/stream_impl.dart:283:7)
E/flutter (22195): #5 _SyncStreamController._sendDone (dart:async/stream_controller.dart:771:19)
E/flutter (22195): #6 _StreamController._closeUnchecked (dart:async/stream_controller.dart:628:7)
E/flutter (22195): #7 _StreamController.close (dart:async/stream_controller.dart:621:5)
E/flutter (22195): #8 new _WebSocketImpl._fromSocket.<anonymous closure> (dart:_http/websocket_impl.dart:1155:19)
E/flutter (22195): #9 _RootZone.runGuarded (dart:async/zone.dart:1302:10)
E/flutter (22195): #10 _BufferingStreamSubscription._sendDone.sendDone (dart:async/stream_impl.dart:389:13)
E/flutter (22195): #11 _BufferingStreamSubscription._sendDone (dart:async/stream_impl.dart:399:15)
E/flutter (22195): #12 _BufferingStreamSubscription._close (dart:async/stream_impl.dart:283:7)
E/flutter (22195): #13 _SinkTransformerStreamSubscription._close (dart:async/stream_transformers.dart:96:11)
E/flutter (22195): #14 _EventSinkWrapper.close (dart:async/stream_transformers.dart:23:11)
E/flutter (22195): #15 _WebSocketProtocolTransformer.close (dart:_http/websocket_impl.dart:120:16)
E/flutter (22195): #16 _SinkTransformerStreamSubscription._handleDone (dart:async/stream_transformers.dart:141:24)
E/flutter (22195): #17 _RootZone.runGuarded (dart:async/zone.dart:1302:10)
E/flutter (22195): #18 _BufferingStreamSubscription._sendDone.sendDone (dart:async/stream_impl.dart:389:13)
E/flutter (22195): #19 _BufferingStreamSubscription._sendDone (dart:async/stream_impl.dart:399:15)
E/flutter (22195): #20 _BufferingStreamSubscription._close (dart:async/stream_impl.dart:283:7)
E/flutter (22195): #21 _SyncStreamController._sendDone (dart:async/stream_controller.dart:771:19)
E/flutter (22195): #22 _StreamController._closeUnchecked (dart:async/stream_controller.dart:628:7)
E/flutter (22195): #23 _StreamController.close (dart:async/stream_controller.dart:621:5)
E/flutter (22195): #24 _Socket._onData (dart:io/runtime/binsocket_patch.dart:1728:21)
E/flutter (22195): #25 _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
E/flutter (22195): #26 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
E/flutter (22195): #27 _BufferingStreamSubscription._add (dart:async/stream_impl.dart:263:7)
E/flutter (22195): #28 _SyncStreamController._sendData (dart:async/stream_controller.dart:763:19)
E/flutter (22195): #29 _StreamController._add (dart:async/stream_controller.dart:639:7)
E/flutter (22195): #30 _StreamController.add (dart:async/stream_controller.dart:585:5)
E/flutter (22195): #31 _RawSecureSocket._closeHandler (dart:io/secure_socket.dart:798:21)
E/flutter (22195): #32 _RawSecureSocket._tryFilter.<anonymous closure> (dart:io/secure_socket.dart:920:11)
E/flutter (22195): #33 _RootZone.runUnary (dart:async/zone.dart:1379:54)
E/flutter (22195): #34 _FutureListener.handleValue (dart:async/future_impl.dart:129:18)
E/flutter (22195): #35 Future._propagateToListeners.handleValueCallback (dart:async/future_impl.dart:642:45)
E/flutter (22195): #36 Future._propagateToListeners (dart:async/future_impl.dart:671:32)
E/flutter (22195): #37 Future._completeWithValue (dart:async/future_impl.dart:486:5)
E/flutter (22195): #38 Future._asyncComplete.<anonymous closure> (dart:async/future_impl.dart:516:7)
E/flutter (22195): #39 _microtaskLoop (dart:async/schedule_microtask.dart:41:21)
E/flutter (22195): #40 _startMicrotaskLoop (dart:async/schedule_microtask.dart:50:5)
when using ConnectionState.connected enum , got this error:
The name 'ConnectionState' is defined in libraries 'package:flutter/src/widgets/async.dart' and 'package:mqtt_client/mqtt_client.dart'
Is there something wrong ?
import 'package:flutter/material.dart';
import 'dart:async';
import 'package:mqtt_client/mqtt_client.dart';
A passhprase can be set for a private key on a secure socket, the mqtt_client api should expose this for users to set if needed.
import 'dart:async';
import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
Future<int> main() async {
final MqttClient client = MqttClient('test.mosquitto.org', '');
client.logging(on: false);
client.keepAlivePeriod = 60;
/// Add the unsolicited disconnection callback
client.onDisconnected = onDisconnected;
client.onSubscribed = onSubscribed;
final MqttConnectMessage connMess = MqttConnectMessage()
.withClientIdentifier('Mqtt_spl_id')
.keepAliveFor(60) // Must agree with the keep alive set above or not set
.withWillTopic('willtopic') // If you set this you must set a will message
.withWillMessage('My Will message')
.startClean() // Non persistent session for testing
.withWillQos(MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
client.connectionMessage = connMess;
try {
await client.connect();
} on Exception catch (e) {
print('EXAMPLE::client exception - $e');
client.disconnect();
}
if (client.connectionStatus.state == ConnectionState.connected) {
print('EXAMPLE::Mosquitto client connected');
} else {
print(
'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, status is ${client.connectionStatus}');
client.disconnect();
exit(-1);
}
const String topic = 'com/spl/mqtt/connect'; // Not a wildcard topic
client.subscribe(topic, MqttQos.atLeastOnce);
client.updates.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final MqttPublishMessage recMess = c[0].payload;
final String pt =
MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
print(
'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
print('');
});
print('EXAMPLE::Publishing our topic');
final MqttClientPayloadBuilder builder = MqttClientPayloadBuilder();
builder.addString('Hello from sql connnect');
/// Subscribe to it
client.subscribe(topic, MqttQos.exactlyOnce);
/// Publish it
client.publishMessage(topic, MqttQos.exactlyOnce, builder.payload);
await MqttUtilities.asyncSleep(20);
print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(20);
print('EXAMPLE::Disconnecting');
client.disconnect();
await MqttUtilities.asyncSleep(10);
print("EXAMPLE::Connecting again");
await client.connect();
print('EXAMPLE::Publishing our topic again');
await MqttUtilities.asyncSleep(2);
client.publishMessage(topic, MqttQos.exactlyOnce, builder.payload);
print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(20);
print("end exc");
return 0;
}
/// The subscribed callback
void onSubscribed(String topic) {
print('EXAMPLE::Subscription confirmed for topic $topic');
}
/// The unsolicited disconnect callback
void onDisconnected() {
print('EXAMPLE::OnDisconnected client callback - Client disconnection');
}
EXAMPLE::Mosquitto client connecting....
EXAMPLE::Mosquitto client connected
EXAMPLE::Publishing our topic
EXAMPLE::Subscription confirmed for topic com/spl/mqtt/connect
EXAMPLE::Change notification:: topic is <com/spl/mqtt/connect>, payload is <-- Hello from sql connnect -->
EXAMPLE::Sleeping....
EXAMPLE::Disconnecting
EXAMPLE::Connecting again
EXAMPLE::OnDisconnected client callback - Client disconnection
EXAMPLE::OnDisconnected client callback - Client disconnection
Unhandled exception:
mqtt-client::NoConnectionException: The maximum allowed connection attempts ({3}) were exceeded. The broker is not responding to the connection request message (Missing Connection Acknowledgement
#0 SynchronousMqttConnectionHandler.internalConnect (file:///Users/lengxr/.pub-cache/hosted/pub.flutter-io.cn/mqtt_client-4.0.0/lib/src/connectionhandling/mqtt_client_synchronous_mqtt_connection_handler.dart:72:7)
<asynchronous suspension>
#1 MqttConnectionHandler.connect (file:///Users/lengxr/.pub-cache/hosted/pub.flutter-io.cn/mqtt_client-4.0.0/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:56:13)
<asynchronous suspension>
#2 MqttClient.connect (file:///Users/lengxr/.pub-cache/hosted/pub.flutter-io.cn/mqtt_client-4.0.0/lib/src/mqtt_client.dart:163:37)
<asynchronous suspension>
#3 main (file:///Users/lengxr/Documents/FlutterProject/mqtt_demo/lib/connect_main.dart:71:16)
<asynchronous suspension>
#4 _startIsolate.<anonymous closure> (dart:isolate/runtime/libisolate_patch.dart:289:19)
#5 _RawReceivePortImpl._handleMessage (dart:isolate/runtime/libisolate_patch.dart:171:12)
Dart 2.0 is now a stable release. Please support it (and with that Flutter).
Whenever I loose my connection weather by disconnecting the broker, turning my wifi/data off or through calling disconnect() and I try to reconnect using client.connect I get an error due an existing certficate:
E/flutter ( 5332): [ERROR:flutter/shell/common/shell.cc(184)] Dart Error: Unhandled exception:
E/flutter ( 5332): TlsException: Failure trusting builtin roots (OS Error:
E/flutter ( 5332): CERT_ALREADY_IN_HASH_TABLE(x509_lu.c:356), errno = 0)
E/flutter ( 5332): #0 _SecurityContext.setTrustedCertificatesBytes (dart:io/runtime/binsecure_socket_patch.dart:171:59)
E/flutter ( 5332): #1 MqttSecureConnection.connect (file:///Users/mt/Projects/flutter/mqtt_client/lib/src/connectionhandling/mqtt_client_mqtt_secure_connection.dart:56:19)
E/flutter ( 5332): #2 SynchronousMqttConnectionHandler.internalConnect (file:///Users/mt/Projects/flutter/mqtt_client/lib/src/connectionhandling/mqtt_client_synchronous_mqtt_connection_handler.dart:53:24)
E/flutter ( 5332):
E/flutter ( 5332): #3 MqttConnectionHandler.connect (file:///Users/mt/Projects/flutter/mqtt_client/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler.dart:56:13)
E/flutter ( 5332):
E/flutter ( 5332): #4 MqttClient.connect (file:///Users/mt/Projects/flutter/mqtt_client/lib/src/mqtt_client.dart:165:37)
E/flutter ( 5332):
E/flutter ( 5332): #5 Mqtt.connect (package:mqtt/mqtt.dart:39:18)
E/flutter ( 5332):
I don't know if you are able to reproduce it without flutter...
When the Dart client publishes 2 messages quickly after each other with QoS 2, in many cases (related to timing) the second is never fully processed and gets lost. It happens when the broker has sent the PUBREC of the 2nd message before the PUBCOMP of the 1st message: then the client never sends a PUBREL for the 2nd message. This is fully reproducable.
Mosquitto broker traces (the XXXXXXXX-XXXX client is the Dart client, mosqqub/* is a mosquitto_sub
command line client subscribed with QoS 0, that is irrelevant for the problem):
Sequence working OK (first id=48, second id=49):
1537369517: Received PUBLISH from 2382a99f-fe9c (d0, q2, r0, m48, 'test/topic/a', ... (504 bytes))
1537369517: Sending PUBREC to 2382a99f-fe9c (Mid: 48)
1537369517: Received PUBLISH from 2382a99f-fe9c (d0, q2, r0, m49, 'test/topic/a', ... (360 bytes))
1537369517: Received PUBREL from 2382a99f-fe9c (Mid: 48)
1537369517: Sending PUBCOMP to 2382a99f-fe9c (Mid: 48)
1537369517: Sending PUBLISH to mosqsub/12643-xos069.xo (d0, q0, r0, m0, 'test/topic/a', ... (504 bytes))
1537369517: Sending PUBREC to 2382a99f-fe9c (Mid: 49)
1537369517: Received PUBREL from 2382a99f-fe9c (Mid: 49)
1537369517: Sending PUBCOMP to 2382a99f-fe9c (Mid: 49)
1537369517: Sending PUBLISH to mosqsub/12643-xos069.xo (d0, q0, r0, m0, 'test/topic/a', ... (360 bytes))
Sequence working not OK (first id=4, second id=5):
1537369818: Received PUBLISH from 22c2105a-6926 (d0, q2, r0, m4, 'test/topic/a', ... (1175 bytes))
1537369818: Sending PUBREC to 22c2105a-6926 (Mid: 4)
1537369818: Received PUBLISH from 22c2105a-6926 (d0, q2, r0, m5, 'test/topic/a', ... (360 bytes))
1537369818: Sending PUBREC to 22c2105a-6926 (Mid: 5)
1537369818: Received PUBREL from 22c2105a-6926 (Mid: 4)
1537369818: Sending PUBCOMP to 22c2105a-6926 (Mid: 4)
1537369818: Sending PUBLISH to mosqsub/12643-xos069.xo (d0, q0, r0, m0, 'test/topic/a', ... (1175 bytes))
This might be related to issue #33 but I'm not sure...
certificate_chain.patch.txt
The client should allow the user to specify a certificate chain path for secure communications. Patch attached.
Running the example script results in the following output:
EXAMPLE::Mosquitto client connecting....
EXAMPLE::Mosquitto client connected
EXAMPLE::Publishing our topic
EXAMPLE::Sleeping....
Changing line 104 to
client.publishMessage(pubTopic, MqttQos.atMostOnce, builder.payload);
EXAMPLE::Mosquitto client connecting....
EXAMPLE::Mosquitto client connected
EXAMPLE::Publishing our topic
EXAMPLE::Sleeping....
EXAMPLE::Change notification:: topic is <Dart/Mqtt_client/testtopic>, payload is <-- Hello from mqtt_client -->
Also receiving messages sent with
mosquitto_pub -t "Dart/Mqtt_client/testtopic" -m "hello world!" -q 0
works. But not with -q 1
or -q 2
mqtt_client versions: 3.1.0 and 3.0.0
Dart VM version: 2.0.0 (Fri Aug 3 10:53:23 2018 +0200) on "macos_x64"
Mosquitto 1.5.1
mqtt_client is support dart2 now, why doesn't support flutter 0.4.4 beta?
Using mqtt_client 3.1.0 with
Flutter 0.7.5 • channel dev • https://github.com/flutter/flutter.git
Framework • revision eab5cd9853 (7 days ago) • 2018-08-30 14:47:04 -0700
Engine • revision dc7b5eb89d
Tools • Dart 2.1.0-dev.3.0.flutter-760a9690c2
The mqtt example code doesn't work as expected in a new flutter project, since I receive change notifications from other topics as well:
I/flutter (15268): EXAMPLE::Change notification:: topic is <test/lol>, payload is <-- test 3 -->
I/flutter (15268): EXAMPLE::Change notification:: topic is <ebcon/cinder/TicToc/JSON>, payload is <-- {"Node": "cinder", "Title": "TicToc", "Data": "1536237360 2018/09/06 05:36:00 -0700"} -->
Notice that the mqtt example code doesn't subscribe to these topics.
Version: 4.0.0 (MqttClient _client) -> It could not await the MqttClient.connect return
MqttClientConnectionStatus cs = await _client.connect();
if(cs != null && cs.state == ConnectionState.connected){
Version: 3.3.3 -> It's working pretty well
ConnectionState cs = await _client.connect();
if(cs != null && cs == ConnectionState.connected){
With the incorporation of the Observable package code into the client the authors and license files need to be checked for compliance with the code headers.
Hi, I tried to install the package but it gives me the following error:
Running "flutter packages get" in myapp...
Package mqtt_client has no versions that match >=1.5.0 <2.0.0 derived from:
- myapp depends on version ^1.5.0
pub get failed (1)
I tested it with a fresh new app:
Running "flutter packages get" in myapp... 18,1s
[✓] Flutter is fully installed. (Channel beta, v0.1.5, on Mac OS X 10.13.3 17D102, locale de-DE)
[✗] Android toolchain - develop for Android devices is not installed.
[✓] iOS toolchain - develop for iOS devices is fully installed. (Xcode 9.2)
[✗] Android Studio is not installed. (not installed)
[✓] VS Code is fully installed. (version 1.21.1)
[✓] Connected devices is fully installed. (1 available)
I also tried to install version 1.4 or 1.3, but I always get the same message.
Contents of pubspec.yaml:
name: myapp
description: A new Flutter project.
dependencies:
flutter:
sdk: flutter
# The following adds the Cupertino Icons font to your application.
# Use with the CupertinoIcons class for iOS style icons.
cupertino_icons: ^0.1.0
mqtt_client: ^1.5.0
dev_dependencies:
flutter_test:
sdk: flutter
Any help or hint is greatly appreciated!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.