Code Monkey home page Code Monkey logo

Comments (10)

shamblett avatar shamblett commented on August 30, 2024

Socket exceptions are raised by the Dart runtime not the client Dart alo controls the netwotk stack not the client.

from mqtt_client.

luohao123 avatar luohao123 commented on August 30, 2024

@shamblett Hi, sir.

I have a Client wrapper using this wonderful lib, I don't know how, it connect, and after that, directly got disconnect, everytime does it, do u got any idea?

Here is what my client like:

class MqttClient extends ClientHandler {
  MqttServerClient? _client;
  String? _clientId;
  StreamController<PayloadWithTopic>? _messagesController =
      StreamController.broadcast();
  BehaviorSubject<ConnectionState> _cnxBehavior =
      BehaviorSubject<ConnectionState>();
  int maxReconnect = 10;
  int _crtReconnect = 0;
  List<String> _subscribedTopics = [];

  @override
  Future<bool> connect(
      {required host,
      required String username,
      required String password,
      String? clientId,
      int? port}) async {
    String cid = clientId ?? getClientId()!;
    _clientId = cid;
    _client =
        MqttServerClient.withPort(host, cid, 1883, maxConnectionAttempts: 3);
    debugPrint(
        '!!!! ------------> [conn] connect with $host, $port, $username, $password, ${_clientId}');

    // _client!.logging(on: true);
    _client!.logging(on: false);
    _client!.onConnected = onConnected;
    _client!.onDisconnected = onDisconnected;
    _client!.onUnsubscribed = onUnsubscribed;
    _client!.onSubscribed = onSubscribed;
    _client!.onSubscribeFail = onSubscribeFail;
    _client!.onAutoReconnect = onAutoReconnect;
    _client!.pongCallback = pong;
    _client!.keepAlivePeriod = 60;
    _client!.autoReconnect = false;

    final connMessage = MqttConnectMessage()
        .authenticateAs(username, password)
        .withWillTopic('lastwills')
        .withWillMessage('Will message')
        .withClientIdentifier(cid)
        .withProtocolName("MQTT")
        .withProtocolVersion(4)
        .withWillQos(MqttQos.atLeastOnce);
    _client!.connectionMessage = connMessage;
    try {
      //_messagesController =  StreamController.broadcast();
      _broadcastConnectionState();
      await _client!.connect();

      _broadcastConnectionState();
      _listenAndFilter();
      return true;
    } catch (e) {
      kLog('Exception: $e');
      if (_client != null) _client!.disconnect();
      return false;
    }
  }

  void _subscribeToArchivesTopics() {
    _client!
        .subscribe(getClientId()!.toArchivesRoomsTopic, MqttQos.atLeastOnce);
    _client!
        .subscribe(getClientId()!.toArchivesMessagesTopic, MqttQos.atLeastOnce);
    _client!.subscribe(getClientId()!.toArchivesMyId, MqttQos.atLeastOnce);
    debugPrint("[client subscribe archives] just after connect");
  }

  void _listenAndFilter() {
    _client!.updates!.listen((List<MqttReceivedMessage<MqttMessage>> c) {
      final MqttPublishMessage message = c[0].payload as MqttPublishMessage;
      String topic = c[0].topic;
      // final payload =
      //     MqttPublishPayload.bytesToStringAsString(message.payload.message);
      final payload = Utf8Decoder().convert(message.payload.message);
      debugPrint('[_listenAndFilter] Got msg: $topic, $payload');
      // var a = utf8.decode(payload.codeUnits);
      // print(payload);
      _messagesController!
          .add(PayloadWithTopic(payload: payload, topic: topic));
      debugPrint(
          '[_listenAndFilter] Received message:$payload from topic: ${c[0].topic}');
    });
  }

  _broadcastConnectionState() {
    if (_client == null) {
      debugPrint('[broadcast status] client is null, disconnected...');
      _cnxBehavior.add(ConnectionState.disconnected);
      connectState.value = ConnectionState.disconnected;
      return;
    }
    if (_client!.connectionStatus == null) {
      print('[broadcast status] connectionStatus is null, disconnected...');
      _cnxBehavior.add(ConnectionState.disconnected);
      connectState.value = ConnectionState.disconnected;
      return;
    }
    switch (_client!.connectionStatus!.state) {
      case MqttConnectionState.disconnecting:
        _cnxBehavior.add(ConnectionState.disconnecting);
        connectState.value = ConnectionState.disconnecting;
        break;
      case MqttConnectionState.disconnected:
        _cnxBehavior.add(ConnectionState.disconnected);
        connectState.value = ConnectionState.disconnected;
        break;
      case MqttConnectionState.connecting:
        _cnxBehavior.add(ConnectionState.connecting);
        connectState.value = ConnectionState.connecting;
        break;
      case MqttConnectionState.connected:
        _cnxBehavior.add(ConnectionState.connected);
        connectState.value = ConnectionState.connected;
        break;
      case MqttConnectionState.faulted:
        _cnxBehavior.add(ConnectionState.faulted);
        connectState.value = ConnectionState.faulted;
        break;
    }
  }

  @override
  bool isConnected() {
    if (_client == null) {
      debugPrint("[isConnected] _client is null!! not connected");
      return false;
    }
    if (_client!.connectionStatus == null) {
      debugPrint("[isConnected] _client connectionStatus null: not connected");
      return false;
    }
    return _client!.connectionStatus!.state == MqttConnectionState.connected;
  }

  @override
  bool isConnecting() {
    if (_client != null && _client!.connectionStatus != null) {
      return _client!.connectionStatus!.state == MqttConnectionState.connecting;
    }
    return false;
  }

  @override
  void joinRoom(String bareRoom) {
    String messagesTopic = bareRoom.toChattingTopic;
    String eventsTopic = bareRoom.toRoomEventsTopic;
    subscribeTopicOnlyOnce(messagesTopic);
    subscribeTopicOnlyOnce(eventsTopic);
  }

  @override
  void leaveRoom(String bareRoom) {
    String messagesTopic = bareRoom.toChattingTopic;
    String eventsTopic = bareRoom.toRoomEventsTopic;
    if (_client == null) {
      return;
    }
    _client!.unsubscribe(messagesTopic);
    _client!.unsubscribe(eventsTopic);
    _subscribedTopics.remove(messagesTopic);
    _subscribedTopics.remove(eventsTopic);
  }

  @override
  void joinContactEvents(String contactId) {
    subscribeTopicOnlyOnce(contactId.toPresenceTopic);
  }

  void subscribeTopicOnlyOnce(String topic) {
    if (!_subscribedTopics.contains(topic)) {
      _client!.subscribe(topic, MqttQos.atLeastOnce);
      _subscribedTopics.add(topic);
      debugPrint("[JION ROOM] !!!! ******** join roomId: $topic");
    }
  }

  @override
  void joinMyEvents(String myId) {
    if (isConnected()) {
      subscribeTopicOnlyOnce(myId.toPersonalEventTopic);
      subscribeTopicOnlyOnce(myId.toArchivesMessagesTopic);
    } else {
      debugPrint("client not connected, pass joinMyEvents");
    }
  }

  @override
  void leaveContactEvents(String contactId) {
    if (_client == null) {
      return;
    }
    _client!.unsubscribe(contactId.toPresenceTopic);
  }

  @override
  void sendPayload(String payload, String channel) {
    final builder = MqttClientPayloadBuilder();
    builder.addUTF8String(payload);
    debugPrint("sending payload client? $_client");
    if (_client != null) {
      _client!.publishMessage(channel, MqttQos.atLeastOnce, builder.payload!);
    } else {
      // do with disconnected
    }
  }

  @override
  void sendFilePayload(File file, String channel) {
    final bytes = file.readAsBytesSync();
    var mime = lookupMimeType(file.path);
    String base64Image = "data:" +
        (mime ?? "text/plain") +
        ";base64," +
        base64Encode(bytes) +
        "," +
        basename(file.path);
    debugPrint('[SENDING... FILE] $base64Image, $channel');
    sendPayload(base64Image, channel);
  }

  @override
  void disconnect() {
    try {
      _client!.disconnect();
    } catch (e) {
      kLog("[disconnect error] $e");
    }
    _client = null;
    _clientId = null;
  }

  @override
  Stream<PayloadWithTopic> allMessagesStream() {
    return _messagesController!.stream;
  }

// connection succeeded
  void onConnected() {
    debugPrint('[onConnected] Connected...');
    _broadcastConnectionState();
      _subscribeToArchivesTopics();

  }

// unconnected
  void onDisconnected() {
    debugPrint('[Disconnected] Disconnected...');
    _broadcastConnectionState();
    if (_messagesController != null && !_messagesController!.isClosed) {
      //_messagesController!.close();
    }
    _client = null;
    _clientId = null;
  }

// subscribe to topic succeeded
  void onSubscribed(String topic) {
    debugPrint('Subscribed topic: $topic');
  }

// subscribe to topic failed
  void onSubscribeFail(String topic) {
    debugPrint('Failed to subscribe $topic');
  }

// unsubscribe succeeded
  void onUnsubscribed(String? topic) {
    debugPrint('Unsubscribed topic: $topic');
  }

}

Is there any order wrong in my code?

from mqtt_client.

shamblett avatar shamblett commented on August 30, 2024

Ill need a log from the client also.

from mqtt_client.

shamblett avatar shamblett commented on August 30, 2024

It may be your connect message, get rid of everything to do.with wills and remove the protocol name and version.ie trim the message down to just the basics of cid and any auth parameters.

from mqtt_client.

luohao123 avatar luohao123 commented on August 30, 2024

the connection, onConnected function will call, and I can get first batch of onConnected data, but after that, just suddenly disconnect without any clue. From server side, seems there is duplicated id login, but duplicated client new connection should be able to replace old one.

So don't know it's the client issue or server issue, and this siutation happens every time almost.

You mean this?

final connMessage = MqttConnectMessage()
        .authenticateAs(username, password)
        .withClientIdentifier(cid)
        .withProtocolName("MQTT")
        .withProtocolVersion(4)
        .withWillQos(MqttQos.atLeastOnce);

does Qos should be set?

from mqtt_client.

shamblett avatar shamblett commented on August 30, 2024

If you are connecting this implies your connection message is OK if it wasnt the broker should cleanly disconnect you not just close the socket, the connection reset by peer is from the servet ripping the socket away

Remove everythibg but the cid and the auth the qos is for will stuff. When its working you can add these back in as you wish.

from mqtt_client.

luohao123 avatar luohao123 commented on August 30, 2024

@shamblett hello, appreciated for the help, I am pretty new to mqtt, can u teach me what will will cause to this ? and why

from mqtt_client.

wzgl998877 avatar wzgl998877 commented on August 30, 2024

你好,这个问题解决了吗,我这边也遇到了这个问题,当把autoReconnect设置为true后,出现了clientId重复的问题,这会导致客户端一直不断的重连,但连接不成功,下面是我的代码Future init({
required String server,
required int port,
required String clientIdentifier,
String username = "",
String password = "",
}) async {
if (_mqttClient != null) {
return _mqttClient!;
}
final connMess = MqttConnectMessage()
.withClientIdentifier(clientIdentifier)
.authenticateAs(username, password)
.withProtocolName(MqttClientConstants.mqttV311ProtocolName)
.withProtocolVersion(MqttClientConstants.mqttV311ProtocolVersion);
_log("server:$server\n port:$port\n clientIdentifier:$clientIdentifier\n username:$username \n password:$password", writeLog: false);
_mqttClient = MqttServerClient.withPort(server, clientIdentifier, port)
..logging(on: !BaseUrlManager().isPRO()) //是否开启日志
..setProtocolV311() //设置mqtt 协议
..keepAlivePeriod = 60 //持连接ping-pong周期。默认不设置时关闭。
..onConnected = onConnected //连接成功回调
..onDisconnected = _onDisconnected
..onSubscribed = _onSubscribed
..onSubscribeFail = _onSubscribeFail
..onUnsubscribed = _onUnSubscribed
..autoReconnect = true //自动重连
..onAutoReconnect = onAutoReconnect
..onAutoReconnected = onAutoReconnected
..resubscribeOnAutoReconnect = true
..connectTimeoutPeriod = 15000 //连接超时时间
..pongCallback = _pong; //心跳
_mqttClient?.connectionMessage = connMess;
if (username.isNotEmpty) {
this.username = username;
}
if (password.isNotEmpty) {
this.password = password;
}
return _mqttClient!;
}

from mqtt_client.

luohao123 avatar luohao123 commented on August 30, 2024

没有,我这边遇到的问题是,在网络切换的时候必然会出现不断的重连。这个库的作者总说不是库的问题,我感觉它这个库就有问题

from mqtt_client.

peng093 avatar peng093 commented on August 30, 2024

你好,这个问题解决了吗,我这边也遇到了这个问题,当把autoReconnect设置为true后,出现了clientId重复的问题,这会导致客户端一直不断的重连,但连接不成功,下面是我的代码Future init({ required String server, required int port, required String clientIdentifier, String username = "", String password = "", }) async { if (_mqttClient != null) { return _mqttClient!; } final connMess = MqttConnectMessage() .withClientIdentifier(clientIdentifier) .authenticateAs(username, password) .withProtocolName(MqttClientConstants.mqttV311ProtocolName) .withProtocolVersion(MqttClientConstants.mqttV311ProtocolVersion); _log("server:$server\n port:$port\n clientIdentifier:$clientIdentifier\n username:$username \n password:$password", writeLog: false); _mqttClient = MqttServerClient.withPort(server, clientIdentifier, port) ..logging(on: !BaseUrlManager().isPRO()) //是否开启日志 ..setProtocolV311() //设置mqtt 协议 ..keepAlivePeriod = 60 //持连接ping-pong周期。默认不设置时关闭。 ..onConnected = onConnected //连接成功回调 ..onDisconnected = _onDisconnected ..onSubscribed = _onSubscribed ..onSubscribeFail = _onSubscribeFail ..onUnsubscribed = _onUnSubscribed ..autoReconnect = true //自动重连 ..onAutoReconnect = onAutoReconnect ..onAutoReconnected = onAutoReconnected ..resubscribeOnAutoReconnect = true ..connectTimeoutPeriod = 15000 //连接超时时间 ..pongCallback = _pong; //心跳 _mqttClient?.connectionMessage = connMess; if (username.isNotEmpty) { this.username = username; } if (password.isNotEmpty) { this.password = password; } return _mqttClient!; }

withClientIdentifier设置成随机字符串,就不会一直断开了

from mqtt_client.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.