gmqtt's Issues

Unhandled ConnectionRefusedError

If you run properties example with host that is not running broker (eg localhost), program will crash

Traceback (most recent call last):
  File "", line 98, in <module>
    loop.run_until_complete(main('localhost', port, None))
  File "/usr/lib64/python3.7/asyncio/", line 579, in run_until_complete
    return future.result()
  File "", line 51, in main
    await sub_client.connect(broker_host, broker_port)
  File "/home/d21d3q/workspace/gmqtt/venv/lib/python3.7/site-packages/gmqtt-0.4.0-py3.7.egg/gmqtt/", line 144, in connect
  File "/home/d21d3q/workspace/gmqtt/venv/lib/python3.7/site-packages/gmqtt-0.4.0-py3.7.egg/gmqtt/", line 160, in _create_connection
  File "/home/d21d3q/workspace/gmqtt/venv/lib/python3.7/site-packages/gmqtt-0.4.0-py3.7.egg/gmqtt/mqtt/", line 26, in create_connection
  File "/usr/lib64/python3.7/asyncio/", line 954, in create_connection
    raise exceptions[0]
  File "/usr/lib64/python3.7/asyncio/", line 941, in create_connection
    await self.sock_connect(sock, address)
  File "/usr/lib64/python3.7/asyncio/", line 464, in sock_connect
    return await fut
  File "/usr/lib64/python3.7/asyncio/", line 494, in _sock_connect_cb
    raise OSError(err, f'Connect call failed {address}')
ConnectionRefusedError: [Errno 111] Connect call failed ('', 1883)
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending coro=<Client._resend_qos_messages() running at /home/d21d3q/workspace/gmqtt/venv/lib/python3.7/site-packages/gmqtt-0.4.0-py3.7.egg/gmqtt/> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f238afa0f10>()]>>

Expected behavior would be to try to reconnect normally.


        await pub_client.connect(broker_host, broker_port)
    except ConnectionRefusedError:

message process task


What is the proper way to use asyncio.create_task for running a message process loop? Something like:

while True:
   msg = await client.new_message()



How do I know if receive topic comes from the same subscribe?

I subscribe a topic with plus symbol (LIGHT/+/TURNON)
So I will receive these topics:


I hope to receive these topic can do the same thing.
How do I know that LIGHT/ID_1/TURNON and LIGHT/ID_2/TURNON are from the same subscribe?

following is my sample code:


def turn_on_tv(client, topic, message, qos, properties):
    # do something

def turn_on_light(client, topic, message, qos, properties):
    # do something

def init_subscribe(client):
    global TOPIC_LIST
    TOPIC_LIST = [
        {'topic': 'TV/TURNON'., 'qos': 1, 'method': turn_on_tv},
        {'topic': 'LIGHT/+/TURNON', 'qos': 1, 'method': turn_on_light},

    for t in TOPIC_LIST:
        client.subscribe(t['topic'], t['qos'])

def on_message(client, topic, payload, qos, properties):
    for t in TOPIC_LIST:
        if topic == t['topic']:    # How do I know?
            t['method'](client, topic, message, qos, properties)

Examples are placed in the root of the site-packages dir

Examples are placed in the root of the site-packages directory which will make it hard to identify them (is also a packaging issue).

    Installed (but unpackaged) file(s) found:

From my point of view they should go under or not be included at all:


server keep live bug

when there is keeplive in connack, the exception:
result = self._handle_packet(cmd, packet)
File "E:\workspace\mqtt-simulator\env\lib\site-packages\gmqtt\mqtt\", line 214, in _handle_packet
handler(cmd, packet)
File "E:\workspace\mqtt-simulator\env\lib\site-packages\gmqtt\mqtt\", line 286, in _handle_connack_packet
File "E:\workspace\mqtt-simulator\env\lib\site-packages\gmqtt\mqtt\", line 257, in _update_keepalive_if_needed
self._connection.keepalive = self._keepalive
File "E:\workspace\mqtt-simulator\env\lib\site-packages\gmqtt\mqtt\", line 109, in keepalive
self._keep_connection_callback = asyncio.get_event_loop().call_later(self._keepalive / 2, self._keep_connection)
TypeError: unsupported operand type(s) for /: 'list' and 'int'

    def _parse_properties(self, packet):
        if self.protocol_version < MQTTv50:
            # If protocol is version is less than 5.0, there is no properties in packet
            return {}, packet
        properties_len, left_packet = unpack_variable_byte_integer(packet)
        packet = left_packet[:properties_len]
        left_packet = left_packet[properties_len:]
        properties_dict = defaultdict(list)
        while packet:
            property_identifier, = struct.unpack("!B", packet[:1])
            property_obj = Property.factory(id_=property_identifier)
            if property_obj is None:
                logger.critical('[PROPERTIES] received invalid property id {}, disconnecting'.format(property_identifier))
                return None, None
            result, packet = property_obj.loads(packet[1:])
            for k, v in result.items():
                properties_dict[k].append(v)   # this is list ,but need int
        properties_dict = dict(properties_dict)
        return properties_dict, left_packet missing 1 required positional argument: 'properties'

Following the example on the README i have a connect function defined like this (I'm not using

import asyncio
import os
import signal
import time

from gmqtt import Client as MQTTClient
from gmqtt.mqtt.constants import MQTTv311

host = 'localhost'
port = '1993'

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    client.subscribe('#', qos=0)

def on_message(client, topic, payload, qos, properties):
    print('RECV MSG:', payload)

def on_disconnect(client, packet, exc=None):

def on_subscribe(client, mid, qos):

def ask_exit(*args):

async def main(broker_host, broker_port):
    client = MQTTClient("client-id")

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe

    await client.connect(broker_host, broker_port, version=MQTTv311)
    await STOP.wait()
    await client.disconnect()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    loop.add_signal_handler(signal.SIGINT, ask_exit)
    loop.add_signal_handler(signal.SIGTERM, ask_exit)

    loop.run_until_complete(main(host, port))

Traceback (most recent call last):
  File "/usr/lib/python3.7/site-packages/gmqtt/mqtt/", line 227, in __call__
    result = self._handle_packet(cmd, packet)
  File "/usr/lib/python3.7/site-packages/gmqtt/mqtt/", line 124, in _handle_packet
    handler(cmd, packet)
  File "/usr/lib/python3.7/site-packages/gmqtt/mqtt/", line 179, in _handle_connack_packet
    self.on_connect(self, flags, result)
TypeError: on_connect() missing 1 required positional argument: 'properties'

This will go away if I define the function as

def on_connect(client, flags, rc, properties=None):
    client.subscribe('#', qos=0)

Feature request: client config

I would like to provide a config to Client that contains FAILED_CONNECTIONS_STOP_RECONNECT, RECONNECTION_SLEEP, ...

An example from hbmqtt project:

config = {
    'keep_alive': 10,
    'ping_delay': 1,
    'default_qos': 0,
    'default_retain': False,
    'auto_reconnect': True,
    'reconnect_max_interval': 5,
    'reconnect_retries': 10,
    'topics': {
        '/test': { 'qos': 1 },
        '/some_topic': { 'qos': 2, 'retain': True }

Question:document for how to do pytest

Env:CentOS8/ mosquitto 1.6.9

[root@localhost gmqtt-0.6.4]#  /opt/python38/bin/pytest
===================================================================================== test session starts =====================================================================================
platform linux -- Python 3.8.1, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
rootdir: /root/libs/gmqtt-0.6.4, inifile: pytest.ini
plugins: hypothesis-5.6.0, cov-2.8.1, forked-1.1.3, xdist-1.31.0, asyncio-0.11.0
collected 16 items

tests/ EEEEEEFEEEEEEEEE                                                                                                                                                    [100%]

=========================================================================================== ERRORS ============================================================================================
________________________________________________________________________________ ERROR at setup of test_basic _________________________________________________________________________________

    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/ in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/ in connect
    self._connection = await self._create_connection(
gmqtt/ in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/ in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/ in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/ in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/ in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/ in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():

            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('', 1883)

/opt/python38/lib/python3.8/asyncio/ ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_________________________________________________________________________ ERROR at setup of test_basic_subscriptions __________________________________________________________________________

    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/ in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/ in connect
    self._connection = await self._create_connection(
gmqtt/ in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/ in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/ in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/ in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/ in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/ in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():

            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('', 1883)

/opt/python38/lib/python3.8/asyncio/ ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
___________________________________________________________________________ ERROR at setup of test_retained_message ___________________________________________________________________________

    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/ in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/ in connect
    self._connection = await self._create_connection(
gmqtt/ in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/ in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/ in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/ in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/ in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/ in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():

            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('', 1883)

/opt/python38/lib/python3.8/asyncio/ ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_____________________________________________________________________________ ERROR at setup of test_will_message _____________________________________________________________________________

    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/ in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/ in connect
    self._connection = await self._create_connection(
gmqtt/ in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/ in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/ in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/ in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/ in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/ in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():

            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('', 1883)

/opt/python38/lib/python3.8/asyncio/ ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_________________________________________________________________ ERROR at setup of test_no_will_message_on_gentle_disconnect _________________________________________________________________

    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/ in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/ in connect
    self._connection = await self._create_connection(
gmqtt/ in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/ in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/ in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/ in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/ in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/ in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():

            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('', 1883)

/opt/python38/lib/python3.8/asyncio/ ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_________________________________________________________________________ ERROR at setup of test_shared_subscriptions _________________________________________________________________________

    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/ in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/ in connect
    self._connection = await self._create_connection(
gmqtt/ in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/ in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/ in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/ in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/ in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/ in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():

            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('', 1883)

/opt/python38/lib/python3.8/asyncio/ ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_____________________________________________________________________________ ERROR at setup of test_unsubscribe ______________________________________________________________________________

    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/ in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/ in connect
    self._connection = await self._create_connection(
gmqtt/ in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/ in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/ in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/ in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/ in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/ in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():

            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('', 1883)

/opt/python38/lib/python3.8/asyncio/ ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
______________________________________________________________________ ERROR at setup of test_overlapping_subscriptions _______________________________________________________________________

    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/ in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/ in connect
    self._connection = await self._create_connection(
gmqtt/ in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/ in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/ in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/ in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/ in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/ in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():

            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('', 1883)

/opt/python38/lib/python3.8/asyncio/ ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_______________________________________________________________________ ERROR at setup of test_redelivery_on_reconnect ________________________________________________________________________

    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/ in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/ in connect
    self._connection = await self._create_connection(
gmqtt/ in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/ in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/ in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/ in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/ in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/ in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():

            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('', 1883)

/opt/python38/lib/python3.8/asyncio/ ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
___________________________________________________________________________ ERROR at setup of test_async_on_message ___________________________________________________________________________

    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/ in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/ in connect
    self._connection = await self._create_connection(
gmqtt/ in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/ in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/ in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/ in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/ in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/ in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():

            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('', 1883)

/opt/python38/lib/python3.8/asyncio/ ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
___________________________________________________________________________ ERROR at setup of test_request_response ___________________________________________________________________________

    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/ in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/ in connect
    self._connection = await self._create_connection(
gmqtt/ in _create_connection

Use message callback on subscriptions rather than globally

Right now, message handlers are attached globally on the mqtt client: mqtt_client.on_message = on_message

Would it be possible to attach a message handler to a Subscription? I am thinking of something along the lines of:

hello_sub = mqtt_client.subscribe('hello', qos=0)
hello_sub.on_message = on_message

on_message callback is triggered when sending messages.

Thank you for this great project. It's very nice. I noticed an issue. Please let me know if this is intentional or if there is bug somewhere.

Consider this code:

import asyncio
import os
import signal
import time
import ssl

from gmqtt import Client as MQTTClient

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    client.subscribe('TEST/#', qos=0)

def on_message(client, topic, payload, qos, properties):
    print('[RECV MSG {}] TOPIC: {} PAYLOAD: {} QOS: {} PROPERTIES: {}'.format(client._client_id, topic, payload, qos, properties))

def on_disconnect(client, packet, exc=None):

def on_subscribe(client, mid, qos, properties):

async def main(broker_host):
    client = MQTTClient("code_client-id")

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe

    ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
    ssl_context.verify_mode = ssl.CERT_NONE

    client.set_auth_credentials("usename", "password")
    await client.connect(broker_host,8883,ssl_context)

    client.publish('TEST/TIME', "message from python code", qos=1)

    await STOP.wait()
    await client.disconnect()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    host = ''

the above code provides the following output:

[RECV MSG code_client-id] TOPIC: TEST/TIME PAYLOAD: b'message from python code' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
[RECV MSG code_client-id] TOPIC: TEST/TIME PAYLOAD: b'message from other client' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}

As you can see the on_message callback was triggered both when sending and receiving a message.
Thus making it impossible to distinguish who sent the message (server or client?).

Thank you for your time.

How to detect that reconnect_retries has been exceeded?

When I set reconnect_retries to some value as documented in the README and I want my application to react to the fact that gmqtt has exceeded that number of retries and given up on reconnecting (e.g., to exit the application or to reconnect at a much later stage), how would I do that?

async handle task.cancel()

in all of the example this library is used with an asyncio event() to handle the termination of the library.
How can I handle instead an asyncio coroutine?

I have multiple tasks that are running in parallel with asyncio.gather()

this is another one

async def mqtt_consumer(mqtt_queue):'Starting consumer')

        while True:
  'Waiting for message')
            msg = await mqtt_queue.get()
            print(f"Got {msg}")
    except asyncio.CancelledError:
    finally:'Exiting consumer')
except KeyboardInterrupt as e:
     loop.close()"Successfully shutdown the services.")

How can I run gmqtt in a while True await?

Python 3.8 deprecation warning

Just putting this warning here

  c:\python38-32\lib\site-packages\gmqtt\mqtt\ DeprecationWarning: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.
    self._connected = asyncio.Event(loop=loop)

-- Docs:

Keepalive has 2 bugs that prevent it from working properly

At, the first time it triggers, the computed values is always equal to, not greater than self._keepalive on my system. It therefore skips the keepalive, eventually failing to a disconnect/reconnect.

if time.monotonic() - self._last_data_in > self._keepalive:

Changing the > to >= fixes it for me, making the first keepalive work.

I originally encountered this on a MQTT broker that wanted keepalives shorter than 60 seconds, and in the process of troubleshooting also discovered that you reset keepalive to a hardcoded 60 on a reconnect:
self._connection = await self._create_connection(self._host, self._port, clean_session=True, keepalive=60)

Maybe set a self._keepalive in the Client class with a default of 60 and override it from the connect procedure if a new value is provided?

Connection lost on Linux server


I am getting connect/disconnect events all the time on my Linux (debian) server. The same script works ok on my local machine (mac). The pattern is always the same, like:

CONNECTED->SUBSCRIBED->CONN CLOSE NORMALLY/DISONNECT after exactly 4 minutes->CONNECTED->CONN CLOSE NORMALLY/DISONNECT after exactly 2 minutes. Then it repeats every 2 minutes many times, then again 4 minutes/2 minutes.

I don't find anything in my server logs and there is only "503: mqtt session connection was closed (Unexpected disconnect)" message logged on flespi platform.

Any clue on what this could be? Probably some server/network related stuff but don't know how to identify it
I am attaching the screenshot from my terminal.

Thx on any help that could help me resolve this.

regards, dejan
Screenshot 2020-03-18 at 15 39 41

Connection error: [CONNACK] 0x1 Task exception was never retrieved


I cannot get the examples to work.

I've stripped a publish example down to:

import asyncio

from gmqtt import Client as MQTTClient

TOPIC = 'testtopic/TOPIC'

def on_connect(client, flags, rc, properties):

async def main(broker_host):
    client = MQTTClient(None)
    client.on_connect = on_connect
    await client.connect(broker_host, keepalive=60)
    client.publish(TOPIC, 'Message payload')
    await client.disconnect()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()


A broker is running on localhost.

Python version 3.5.2.

Malformed packet when subscribe non-ASCII topic like 'testtopic/时间'

Enviroment: Win10/python3.8/gmqtt 0.6.7

import asyncio
import os
import signal
import time

from gmqtt import Client as MQTTClient

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    client.subscribe('testtopic/时间', qos=0) #'testtopic/#' Works

def on_message(client, topic, payload, qos, properties):
    print('RECV MSG:', payload)

def on_disconnect(client, packet, exc=None):

def on_subscribe(client, mid, qos, properties):

def ask_exit(*args):

async def main(broker_host, port , token):
    client = MQTTClient("client-id")

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe

    client.set_auth_credentials(token, None)
    await client.connect(broker_host, port )

    client.publish('testtopic/时间',"**" +str(time.time()), qos=1)

    await STOP.wait()
    await client.disconnect()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    host = ""#''
    token = os.environ.get('FLESPI_TOKEN')
    import uuid 
    loop.run_until_complete(main(host,port = 1883, token=str(uuid.uuid4()) ))

Client remains active after disconnect()


My usecase is to use gmqtt with an MQTT broker over an unreliable link, so it can be unavailable for quite a bit or the MQTT TCP connection can be in weird and wonderful states. Furthermore, I need to completely shutdown and discard a gmqtt client and all its resources and tasks if it is unable to connect to such an MQTT broker.

One particular slightly strange connection state can be simulated with SSH port forwarding. On Linux and friends, the command ssh -L 2000:localhost:1 localhost opens local port 2000 and connects it to the closed port 1, simulating a case where you can connect to port 2000, but can't actually get data through.

I've been using code roughly similar to the simplified attachment to make this work with v0.4.5. The attached code produces the following output with v0.4.5:

INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:wait for connect() returned, done: 1, pending: 1
DEBUG:root:Client.connect() completed: 0
DEBUG:root:_DISC.wait() completed: 1
DEBUG:root:_DISC.is_set(): 1
DEBUG:root:Cancelling pending tasks
DEBUG:root:Cancelled pending tasks
DEBUG:root:Wait for disconnect()
DEBUG:root:disconnect() returned
DEBUG:root:waiting for 20 seconds
DEBUG:root:waited for 20 seconds; exiting _main()

So while client.connect() doesn't return, that's ok because I can see from the call to on_disconnect() that there is on connection. After calling client.disconnect() the client is no longer active, which is what I want.

With v0.5, the output is as follows:

INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
DEBUG:gmqtt.mqtt.protocol:[RECV EMPTY] Connection will be reset automatically.
INFO:gmqtt.mqtt.protocol:[CONN CLOSE NORMALLY]
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:wait for connect() returned, done: 1, pending: 1
DEBUG:root:Client.connect() completed: 0
DEBUG:root:_DISC.wait() completed: 1
DEBUG:root:_DISC.is_set(): 1
DEBUG:root:Cancelling pending tasks
DEBUG:root:Cancelled pending tasks
DEBUG:root:Wait for disconnect()
DEBUG:root:disconnect() returned
DEBUG:root:waiting for 20 seconds
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:waited for 20 seconds; exiting _main()
ERROR:asyncio:Exception in callback MqttPackageHandler._handle_exception_in_future(<Task>>)
handle: <Handle MqttPackageHandler._handle_exception_in_future(<Task>>)>
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/", line 88, in _run, *self._args)
  File "/home/veefil/.local/lib/python3.7/site-packages/gmqtt/mqtt/", line 208, in _handle_exception_in_future
    if not future.exception():

This indicates that after having called client.disconnect() the gmqtt client still has some async call-backs going on and at least with the regular API, I couldn't figure out how to make those stop.

I'm not sure whether that behavior is intentional or whether I missed something, so I just wanted to let you know that it seems that v0.5.0 doesn't quite seem to serve the particular use case I'm after. Are there any plans to change this behavior again in the future?

Client does not reconnect


I'm using version 0.5.6 and got the following code:

#!/usr/bin/env python3

import asyncio
from gmqtt import Client as MQTTClient
from gmqtt.mqtt.constants import UNLIMITED_RECONNECTS
import logging


async def main():
    mqtt = MQTTClient('tester')

        'reconnect_retries': UNLIMITED_RECONNECTS,
        'reconnect_delay': 1

    await mqtt.connect('localhost')

    while True:
        #if mqtt.is_connected:
        mqtt.publish('ohlc_1m', 'check')
        await asyncio.sleep(1)

def handle_exception(loop, context):
    # context["message"] will always be there; but context["exception"] may not
    msg = context.get("exception", context["message"])
    logging.error(f"Caught exception: {str(msg)}")

loop = asyncio.get_event_loop()

To test the reconnection I'm restarting mosquitto service and get the following output:

DEBUG:asyncio:Using selector: EpollSelector
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
DEBUG:gmqtt.mqtt.handler:[CMD 0x20] b'\x00\x00\x03"\x00\n'
DEBUG:gmqtt.mqtt.handler:[CONNACK] flags: 0x0, result: 0x0
DEBUG:gmqtt.client:[QoS query IS EMPTY]
DEBUG:gmqtt.mqtt.package:Sending PUBLISH (q0), 'ohlc_1m', ... (5 bytes)
DEBUG:gmqtt.mqtt.package:Sending PUBLISH (q0), 'ohlc_1m', ... (5 bytes)
DEBUG:gmqtt.mqtt.protocol:[RECV EMPTY] Connection will be reset automatically.
INFO:gmqtt.mqtt.protocol:[CONN CLOSE NORMALLY]
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:gmqtt.mqtt.package:Sending PUBLISH (q0), 'ohlc_1m', ... (5 bytes)
Traceback (most recent call last):
  File "", line 32, in <module>
  File "/usr/lib/python3.8/asyncio/", line 612, in run_until_complete
    return future.result()
  File "", line 22, in main
    mqtt.publish('ohlc_1m', 'check')
  File "/usr/lib/python3.8/site-packages/gmqtt/", line 228, in publish
    mid, package = self._connection.publish(message)
  File "/usr/lib/python3.8/site-packages/gmqtt/mqtt/", line 54, in publish
    return self._protocol.send_publish(message)
  File "/usr/lib/python3.8/site-packages/gmqtt/mqtt/", line 111, in send_publish
  File "/usr/lib/python3.8/site-packages/gmqtt/mqtt/", line 45, in write_data
    if not self._transport.is_closing():
AttributeError: 'NoneType' object has no attribute 'is_closing'

I've seen a similar error in #74 so reporting this one for further investigation.

Topic Alias support?

First of all, congrats to the developers on this great project, gmqtt is a joy a to use. Thanks!

I had a quick look and it seems that Topic Alias are not supported by gmqtt. Is that the case? I will happily send a PR improving the docs if they are already supported, just couldn't see how.

MQTT 5.0 support

We need MQTT 5.0 specification support, at least part of it. Let's make this library v5.0 compliant, here is the list of features we need in order of importance:

  • MQTT v5.0 CONNECT packet and custom properties per each packet

  • PUBLISH => set message expiry interval

Next are less important:

  • subscription identifier => client provides own index upon subscribe, broker specify subscription identifier that was the reason of message delivery

  • receive maximum => client specifies limit on quantity of inflight messages sent by broker

  • will delay interval

restore subscriptions after connection lost

I'm facing an issue when restoring subscriptions after a lost connection. I'm testing durability of connections and whenever I shut the connection down (turning the access point off) gmqtt reconnects and delivers messages but the subscriptions remain dead.

I see two of the following log messages right after each other:

INFO:mqtt.MQTTClient:MQTT connected
INFO:gmqtt.mqtt.protocol:[CONN CLOSE NORMALLY]
WARNING:gmqtt.mqtt.handler:[EXC OCCURED] in reconnect future None

A little later:

INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.handler:[EXC OCCURED] in reconnect future None

Any idea on how to either verify subscriptions and resubscribe in case a subscription is lost or how to make subscriptions more robust?

CI is not running for PRs

The CI is not running on PRs. It seems that the credentials are not valid which are used by travis.

        if raise_exc and self._error:
>           raise self._error
E           gmqtt.mqtt.handler.MQTTConnectError: code 134 (Connection Refused: Bad User Name or Password)

Unhandled [WinError 10054]

Елена, извините, опишу на русском, ибо писать много, а мой английский совсем не вери-велл :(

Пришлось протестировать на богопротивной Windows и сразу столкнулся с проблемой.
Вкратце: обычное поведение клиента состоит в том, что при неудачном подключении к брокеру, выполняются попытки переподключения.

Я категорически несогласен с этим поведением и не совсем понимаю, как это поведение обойти.

Пытаюсь обойти следующим образом:

            await self._client.connect(self._host, self._port, self._ssl, version=MQTTv311)
            await self._client.disconnect()

И это даже работает - при неудачной попытке первичного подключения, клиент бросает дальнейшие попытки.

Но не на Windows...
В начале теста я забыл изменить значение параметра ssl метода connect - брокер требует безопасное подключение, но я передал значение False. В итоге наблюдаю следующую картину:

Traceback (most recent call last):
  File "...Python\lib\asyncio\", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [WinError 10054] Удаленный хост принудительно разорвал существующее подключение
[CMD 0xe0] b''

И так по кругу. Приведенный выше код try...except, который призван остановить попытки после первой же неудачной, в данном случае не срабатывает.
Посоветуйте, что можно сделать?

Вопрос, на самом деле, даже более широкий: каким образом я могу не дожидаться подключения к брокеру, если уже после первой попытки понятно, что продолжать бесполезно - ошибочные host и порт, некорректные логин и пароль и т.п.?
В то же время, если возникла необходимость переподключения в процессе работы (например, на какое-то время пропало соединение), то продолжать попытки нужно практически до победы.

Async-with API

Hi there,
Thanks for sharing this project, I'm very excited to see MQTT and asyncio coming together.

I was surprised by the callback-style API in the basic example. I expected the example to use the async-with pattern like the aiohttp or websockets clients. My recreation:

async with gmqtt.connect('') as client:
    await client.publish('TEST/TIME', str(time.time()), qos=1)
    async for message in client.subscribe('TEST/#', qos=0):
        print('RECV MSG:', message)
        break  # print one message only

Do you have any ideas how to implement this? (I'll be looking into it myself)

How to connect to username/password protected broker

I'm new to mqtt. After cloning the repo I managed to make the "Getting started" code work. My question is, how can I connect to protected brokers with a simple username and password? I have no idea what to edit and where.


Use gmqtt without uvlo

I just installed gmqtt but uvloop does not support Windows at the moment.

Can I use gmqtt without uvloop?

In the example, there is this comment: 'gmqtt also compatibility with uvloop'
What does it mean? Is it a dependency or a choice?

best regards

AttributeError: 'NoneType' object has no attribute 'encode'

Does anybody know what is this error!

/home/mohsen/PycharmProjects/gmqtt/venv/bin/python /home/mohsen/PycharmProjects/gmqtt/
Traceback (most recent call last):
File "/home/mohsen/PycharmProjects/gmqtt/", line 59, in
loop.run_until_complete(main(host, token))
File "uvloop/loop.pyx", line 1451, in uvloop.loop.Loop.run_until_complete
File "/home/mohsen/PycharmProjects/gmqtt/", line 41, in main
client.set_auth_credentials(token, None)
File "/home/mohsen/PycharmProjects/gmqtt/gmqtt/", line 116, in set_auth_credentials
self._username = username.encode()
AttributeError: 'NoneType' object has no attribute 'encode'
Task was destroyed but it is pending!
task: <Task pending coro=<Client._resend_qos_messages() running at /home/mohsen/PycharmProjects/gmqtt/gmqtt/> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f41ed8fe918>()]>>

Reference to asyncio in synchronous method gmqtt.client.Client.publish

There is a wrong line in the publish() method of the client:

loop = asyncio.get_event_loop()

which causes failures in my code:

2019-06-06 10:26:48,687 [ERROR] connectors.printer_input: Traceback (most recent call last):
  File "/home/rkrell/work/project/site-packages/", line 120, in thread_handler
    return self.func(*args, **kwargs)
  File "/home/rkrell/work/project/site-packages/gmqtt/", line 195, in publish
    loop = asyncio.get_event_loop()
  File "/home/rkrell/work/project/pypy3.5-7.0.0-linux_x86_64-portable/lib-python/3/asyncio/", line 671, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "/home/rkrell/work/project/pypy3.5-7.0.0-linux_x86_64-portable/lib-python/3/asyncio/", line 583, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'Thread-1'.

Since this line has no effect it should be removed from this synchronous function.

Issues falling back to v311

I've not been able to track this down to anything concrete so I'm hoping someone with more experience of gmqtt can fill in the blanks.

My test environment is gmqtt with mosquitto 1.6.10

In circumstamces I haven't yet been able to reproduce in simple test code, the v5 connection appears to work fine, but a subsequent publish may fail, with Mosquitto logging "Unsupported property type: 44", causing Mosquitto to drop the connection. Other times an identical publish will succeed, and when I starting hacking the gmqtt to find out where the difference was, it was in where the failed publish had protocol.proto_ver == MQTTv50, where the successful publish had somewhere silently downgraded to MQTTv311.

I wasn't able to work out where the downgrade occurred; with logger set to DEBUG I didn't see anything to indicate gmqtt had failed to connect at v5 and reconnected at v3.1.1, and in any case the publish was being triggered indirectly by the on_connect event so any reconnect would have happened ahead of that anyway.

To the best I can tell, the publish worked when it was in an event handler such as on_connect, but failed when it was in a separate async task (running via asyncio.create_task which is added to the event loop after the connection succeeds).

For my purposes, now that I have got this far I can get my application working by setting version to MQTTv311 on connect (I don't need v5 functionality at this point). But if anyone can shed some light on where this downgrade is happening and why I'd be interested.

Client does not fallback to MQTT3 on rabbitmq


From the readme I guessed that gmqtt should reconnect using protocol v3 if mqtt v5 is not supported on the broker side. I'm using rabbitmq which does not support v5. gmqtt got disconnected right after establishing the connected but never reconnected. When explicitly setting v3 it works smoothly. I don't think that's a bug but maybe the Readme should be modified to point out that automatic protocol version adaption is not always working.

Best regards

Asynchronous handlers

Is there any plan to switch to asynchronous handlers?

I'd like to use an asynchronous function to store messages to a database using asyncio:

async def on_message(client, topic, payload, qos, properties):

mqtt_client.on_message = on_message

Indeed it is still possible to add a new task to the event loop; I just thought it would be nicer.

Asynchronous _on_message handler error

@Lenka42, we have a problem :(

/usr/local/lib/python3.7/site-packages/gmqtt/mqtt/ RuntimeWarning: coroutine 'blablabla._on_message' was never awaited
  self.on_message(self, print_topic, packet, 1, properties)

GMQTT with TLS: ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed

TLS option is not working as expected. I can only set ssl to True but I cannot provide a path to the cert file.
Mosquitto configuration is working ok with TLS both publisher and subscriber:

mosquitto_sub --cafile /etc/mosquitto/ca_certificates/ca.crt -h 37c16a79d00a -t 'test' -p 8883 -u report -P 'report'

mosquitto_pub --cafile /etc/mosquitto/ca_certificates/ca.crt -h 37c16a79d00a -t 'test' -m 'amessage' -p 8883 -u report -P 'report'

mosquitto -v -c /etc/mosquitto/mosquitto.conf

1575295867: New client connected from as mosq-W7nvl4LtsfAVItCtHT (p2, c1, k60, u'report').
1575295867: Client mosq-W7nvl4LtsfAVItCtHT disconnected.
1575295870: New connection from on port 8883.

If I try to apply same configuration for gmqtt I get the error on the title

import asyncio

from gmqtt import Client

async def main():
    cli = Client(client_id='test',
    cli.set_auth_credentials('report', password='report')
    await cli.connect(host='37c16a79d00a',

if __name__ == '__main__':
    loop = asyncio.get_event_loop()




transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)


ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain (_ssl.c:1076)

I would like to know how to implement TLS over gmqtt. Could you provide a quick example?

Garbage collection issue using python 3.8


I've tried the getting started example.

Unfortunately it doesn't seem to work due to a garbage collection issue using python 3.8. It works fine on python 3.7 (3.7.5 to be exact).

Here is the traceback:

An open stream was garbage collected prior to establishing network connection; call "stream.close()" explicitly.
Exception in callback _SelectorSocketTransport._call_connection_lost(None)
handle: <Handle _SelectorSocketTransport._call_connection_lost(None)>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/", line 81, in _run, *self._args)
  File "/usr/local/lib/python3.8/asyncio/", line 958, in _call_connection_lost
  File "/usr/local/lib/python3.8/asyncio/", line 716, in _call_connection_lost
  File "/usr/local/lib/python3.8/site-packages/gmqtt/mqtt/", line 194, in connection_lost
    self._connection.put_package((MQTTCommands.DISCONNECT, b''))
AttributeError: 'NoneType' object has no attribute 'put_package'
Traceback (most recent call last):
  File "./", line 62, in <module>
  File "/usr/local/lib/python3.8/asyncio/", line 608, in run_until_complete
    return future.result()
  File "./", line 45, in main
    await client.connect(broker_host)
  File "/usr/local/lib/python3.8/site-packages/gmqtt/", line 146, in connect
    await self._connection.auth(self._client_id, self._username, self._password, will_message=self._will_message,
  File "/usr/local/lib/python3.8/site-packages/gmqtt/mqtt/", line 50, in auth
    await self._protocol.send_auth_package(client_id, username, password, self._clean_session,
  File "/usr/local/lib/python3.8/site-packages/gmqtt/mqtt/", line 87, in send_auth_package
  File "/usr/local/lib/python3.8/site-packages/gmqtt/mqtt/", line 42, in write_data
    if not self._transport.is_closing():
AttributeError: 'NoneType' object has no attribute 'is_closing'

You can reproduce this by using the python 3.8-buster docker image:

host> docker run -it -v $SCRIPT_PATH:/ python:3.8-buster /bin/bash
container> pip install gmqtt
container> /

Does anyone have an idea on how to fix this?


error occurs when QoS is set to 2

I have two mqtt clients.
When the QoS of publish and subscribe is set to 2 at the same time, I get the following error.

Traceback (most recent call last):
  File "/Users/jing/virtualenv/mqtt_test/lib/python3.8/site-packages/gmqtt/mqtt/", line 351, in __call__
    result = self._handle_packet(cmd, packet)
  File "/Users/jing/virtualenv/mqtt_test/lib/python3.8/site-packages/gmqtt/mqtt/", line 204, in _handle_packet
    handler(cmd, packet)
  File "/Users/jing/virtualenv/mqtt_test/lib/python3.8/site-packages/gmqtt/mqtt/", line 403, in _handle_pubrel_packet
    mid, = struct.unpack("!H", packet)
struct.error: unpack requires a buffer of 2 bytes

Why does this error occur?

I tried to do this using multi threading so that we can connect multiple clients in a single code

I tried to use multithread for the code so that we can have multiple clients connect but its showing
RuntimeError: this event loop is already running.Task exception was never retrieved

This is my updated code for the sample code you have given

async def main(broker_host,client_id):
    client = MQTTClient(client_id)

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe

    await client.connect(broker_host)
    client.publish('TEST/TIME', time.ctime(time.time())+str(client_id), qos=1)
    await **STOP.wait()**
    await client.disconnect()

def gg(host,clients):

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    host = ''


        y=Thread(target=gg, args = (host,"client1",))
        x=Thread(target=gg, args = (host,"client2",))
        print("no party")

Reconnect Fail

My client disconnected from broker, but an error occurred while it try to reconnect . The client can't reconnect again but the event_loop still run. Maybe there should be an on_error() callback function set on the client.

I check the source code in `gmqtt.mqtt.handler'

    def _handle_disconnect_packet(self, cmd, packet):
        # reset server topics on disconnect

        future = asyncio.ensure_future(self.reconnect(delay=True))
        self.on_disconnect(self, packet)

    def _handle_exception_in_future(self, future):
        if future.exception():
            logger.warning('[EXC OCCURED] in reconnect future %s', future.exception())

If the reconnect future run fail, and the callback _handle_exception_in_future Just print a log info ......, Why not do a reconnection after some seconds?

About the on_disconnect() callback (In ‘’)

def on_disconnect(client, packet, exc=None):

I define the exe param, but the source code in gmqtt.mqtt.handler

    def _handle_disconnect_packet(self, cmd, packet):
        # reset server topics on disconnect

        future = asyncio.ensure_future(self.reconnect(delay=True))
        self.on_disconnect(self, packet)

In Fact , self.on_disconnect(self, packet) did not pass an exe param, I want get more exception infomation in the callback() , andI really need it !!!

Last Will (graceful disconnect) bug


import asyncio

from gmqtt import Client, Message
from gmqtt.mqtt.constants import MQTTv311

import logging

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    client.subscribe('test/#', qos=0)

def on_message(client, topic, payload, qos, properties):
    print('RECV MSG:', payload)

def on_disconnect(client, packet, exc=None):

def on_subscribe(client, mid, qos):

async def main():
    client = Client("client-id", will_message=Message('test', "LOST", qos=1, retain=True))

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe

    client.set_auth_credentials('username', 'password')
    await client.connect('', 8883, keepalive=60, version=MQTTv311)

    client.publish('test', 'ONLINE', qos=1, retain=True)
    client.publish('test', 'OFFLINE', qos=1, retain=True)
    await client.disconnect() 

if __name__ == '__main__':

Broker log:

test LOST

Why LWT was sent?

Possibility of not acknowledging a published message

Based on #113 I suggest to have the ability to not acknowledge a message at all.
So it should be possible to "return None" in the on_message callback (without raising an exception).

Why? If I want to process a message, but some backend system that I need for processing is unavailable right now, I'd either have to store the message locally somewhere (and process it later) or let the broker know that I have not processed it at all.

Another partial solution would be to disconnect from the broker as long as the backend is unavailable, but still there is a time window where some messages might slip through.

[Feature request] Wait for all pending messages to be published

Please add a way to await for all pending messages to be delivered.

Calling await client.disconnect() is throwing a bunch of errors:

Task was destroyed but it is pending!
task: <Task cancelling name='Task-3' coro=<MQTTProtocol._read_loop() running at /usr/lib/python3.8/site-packages/gmqtt/mqtt/> wait_for=<Future finished result=None>>
Task was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<Client.reconnect() running at /usr/lib/python3.8/site-packages/gmqtt/> cb=[MqttPackageHandler._handle_exception_in_future()]>
/usr/lib/python3.8/asyncio/ RuntimeWarning: coroutine 'Client.reconnect' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Periodically disconnects with mosquitto 1.4.15


I'm using your clean example from README and I have two problems:

  • Client periodically disconnects (around every 30 seconds)
  • Clean session does not work after reconnecting and I receive messages for old subscription which I made before with the same client ID.

mosquitto: 1.4.15
gmqtt: 0.6.5

With mosquitto 1.6.9 all works fine.

