Code Monkey home page Code Monkey logo

gmqtt's Introduction

PyPI version build status Python versions codecov

gmqtt: Python async MQTT client implementation.

Installation

The latest stable version is available in the Python Package Index (PyPi) and can be installed using

pip3 install gmqtt

Usage

Getting Started

Here is a very simple example that subscribes to the broker TOPIC topic and prints out the resulting messages:

import asyncio
import os
import signal
import time

from gmqtt import Client as MQTTClient

# gmqtt also compatibility with uvloop  
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


STOP = asyncio.Event()


def on_connect(client, flags, rc, properties):
    print('Connected')
    client.subscribe('TEST/#', qos=0)


def on_message(client, topic, payload, qos, properties):
    print('RECV MSG:', payload)


def on_disconnect(client, packet, exc=None):
    print('Disconnected')

def on_subscribe(client, mid, qos, properties):
    print('SUBSCRIBED')

def ask_exit(*args):
    STOP.set()

async def main(broker_host, token):
    client = MQTTClient("client-id")

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe

    client.set_auth_credentials(token, None)
    await client.connect(broker_host)

    client.publish('TEST/TIME', str(time.time()), qos=1)

    await STOP.wait()
    await client.disconnect()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    host = 'mqtt.flespi.io'
    token = os.environ.get('FLESPI_TOKEN')

    loop.add_signal_handler(signal.SIGINT, ask_exit)
    loop.add_signal_handler(signal.SIGTERM, ask_exit)

    loop.run_until_complete(main(host, token))

MQTT Version 5.0

gmqtt supports MQTT version 5.0 protocol

Version setup

Version 5.0 is used by default. If your broker does not support 5.0 protocol version and responds with proper CONNACK reason code, client will downgrade to 3.1 and reconnect automatically. Note, that some brokers just fail to parse the 5.0 format CONNECT packet, so first check manually if your broker handles this properly. You can also force version in connect method:

from gmqtt.mqtt.constants import MQTTv311
client = MQTTClient('clientid')
client.set_auth_credentials(token, None)
await client.connect(broker_host, 1883, keepalive=60, version=MQTTv311)

Properties

MQTT 5.0 protocol allows to include custom properties into packages, here is example of passing response topic property in published message:

TOPIC = 'testtopic/TOPIC'

def on_connect(client, flags, rc, properties):
    client.subscribe(TOPIC, qos=1)
    print('Connected')

def on_message(client, topic, payload, qos, properties):
    print('RECV MSG:', topic, payload.decode(), properties)

async def main(broker_host, token):
    client = MQTTClient('asdfghjk')
    client.on_message = on_message
    client.on_connect = on_connect
    client.set_auth_credentials(token, None)
    await client.connect(broker_host, 1883, keepalive=60)
    client.publish(TOPIC, 'Message payload', response_topic='RESPONSE/TOPIC')

    await STOP.wait()
    await client.disconnect()
Connect properties

Connect properties are passed to Client object as kwargs (later they are stored together with properties received from broker in client.properties field). See example below.

  • session_expiry_interval - int Session expiry interval in seconds. If the Session Expiry Interval is absent the value 0 is used. If it is set to 0, or is absent, the Session ends when the Network Connection is closed. If the Session Expiry Interval is 0xFFFFFFFF (max possible value), the Session does not expire.
  • receive_maximum - int The Client uses this value to limit the number of QoS 1 and QoS 2 publications that it is willing to process concurrently.
  • user_property - tuple(str, str) This property may be used to provide additional diagnostic or other information (key-value pairs).
  • maximum_packet_size - int The Client uses the Maximum Packet Size (in bytes) to inform the Server that it will not process packets exceeding this limit.

Example:

client = gmqtt.Client("lenkaklient", receive_maximum=24000, session_expiry_interval=60, user_property=('myid', '12345'))
Publish properties

This properties will be also sent in publish packet from broker, they will be passed to on_message callback.

  • message_expiry_interval - int If present, the value is the lifetime of the Application Message in seconds.
  • content_type - unicode UTF-8 Encoded String describing the content of the Application Message. The value of the Content Type is defined by the sending and receiving application.
  • user_property - tuple(str, str)
  • subscription_identifier - int (see subscribe properties) sent by broker
  • topic_alias - int First client publishes messages with topic string and kwarg topic_alias. After this initial message client can publish message with empty string topic and same topic_alias kwarg.

Example:

def on_message(client, topic, payload, qos, properties):
    # properties example here: {'content_type': ['json'], 'user_property': [('timestamp', '1524235334.881058')], 'message_expiry_interval': [60], 'subscription_identifier': [42, 64]}
    print('RECV MSG:', topic, payload, properties)

client.publish('TEST/TIME', str(time.time()), qos=1, retain=True, message_expiry_interval=60, content_type='json')
Subscribe properties
  • subscription_identifier - int If the Client specified a Subscription Identifier for any of the overlapping subscriptions the Server MUST send those Subscription Identifiers in the message which is published as the result of the subscriptions.

Reconnects

By default, connected MQTT client will always try to reconnect in case of lost connections. Number of reconnect attempts is unlimited. If you want to change this behaviour, do the following:

client = MQTTClient("client-id")
client.set_config({'reconnect_retries': 10, 'reconnect_delay': 60})

Code above will set number of reconnect attempts to 10 and delay between reconnect attempts to 1min (60s). By default reconnect_delay=6 and reconnect_retries=-1 which stands for infinity. Note that manually calling await client.disconnect() will set reconnect_retries for 0, which will stop auto reconnect.

Asynchronous on_message callback

You can define asynchronous on_message callback. Note that it must return valid PUBACK code (0 is success code, see full list in constants)

async def on_message(client, topic, payload, qos, properties):
    pass
    return 0

Other examples

Check examples directory for more use cases.

gmqtt's People

Contributors

blaxter avatar fabaff avatar gdraynz avatar lenka42 avatar mixser avatar mlaass avatar nanomad avatar rkrell avatar space-gurtam avatar sunnyanthony avatar tripzero avatar wialon avatar xiaoliuhust avatar yragantron avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

gmqtt's Issues

I tried to do this using multi threading so that we can connect multiple clients in a single code

I tried to use multithread for the code so that we can have multiple clients connect but its showing
RuntimeError: this event loop is already running.Task exception was never retrieved

This is my updated code for the sample code you have given

async def main(broker_host,client_id):
    client = MQTTClient(client_id)


    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe

    print(client_id+str(1))
    await client.connect(broker_host)
    client.publish('TEST/TIME', time.ctime(time.time())+str(client_id), qos=1)
    print(client_id+str(2))
    await **STOP.wait()**
    print(client_id+str(3))
    await client.disconnect()
    print(client_id+str(4))

def gg(host,clients):
    
    loop.run_until_complete(main(host,clients))


if __name__ == '__main__':
    
    loop = asyncio.get_event_loop()
    host = '172.16.55.130'

    try:

        y=Thread(target=gg, args = (host,"client1",))
        x=Thread(target=gg, args = (host,"client2",))
    except:
        print("no party")
    x.start()
    y.start()

Reference to asyncio in synchronous method gmqtt.client.Client.publish

There is a wrong line in the publish() method of the client:

loop = asyncio.get_event_loop()

which causes failures in my code:

2019-06-06 10:26:48,687 [ERROR] connectors.printer_input: Traceback (most recent call last):
...
  File "/home/rkrell/work/project/site-packages/syncasync.py", line 120, in thread_handler
    return self.func(*args, **kwargs)
  File "/home/rkrell/work/project/site-packages/gmqtt/client.py", line 195, in publish
    loop = asyncio.get_event_loop()
  File "/home/rkrell/work/project/pypy3.5-7.0.0-linux_x86_64-portable/lib-python/3/asyncio/events.py", line 671, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "/home/rkrell/work/project/pypy3.5-7.0.0-linux_x86_64-portable/lib-python/3/asyncio/events.py", line 583, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'Thread-1'.

Since this line has no effect it should be removed from this synchronous function.

Use gmqtt without uvlo

I just installed gmqtt but uvloop does not support Windows at the moment.

Can I use gmqtt without uvloop?

In the example, there is this comment: 'gmqtt also compatibility with uvloop'
What does it mean? Is it a dependency or a choice?

best regards

Asynchronous _on_message handler error

@Lenka42, we have a problem :(

/usr/local/lib/python3.7/site-packages/gmqtt/mqtt/handler.py:353: RuntimeWarning: coroutine 'blablabla._on_message' was never awaited
  self.on_message(self, print_topic, packet, 1, properties)

How to connect to username/password protected broker

Hi,
I'm new to mqtt. After cloning the repo I managed to make the "Getting started" code work. My question is, how can I connect to protected brokers with a simple username and password? I have no idea what to edit and where.

Thanks!

Python 3.8 deprecation warning

Just putting this warning here

::test_admin_subscription
  c:\python38-32\lib\site-packages\gmqtt\mqtt\protocol.py:19: DeprecationWarning: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.
    self._connected = asyncio.Event(loop=loop)

-- Docs: https://docs.pytest.org/en/latest/warnings.html

Examples are placed in the root of the site-packages dir

Examples are placed in the root of the site-packages directory which will make it hard to identify them (is also a packaging issue).

    Installed (but unpackaged) file(s) found:
   /usr/lib/python3.7/site-packages/examples/__init__.py
   /usr/lib/python3.7/site-packages/examples/__pycache__/__init__.cpython-37.opt-1.pyc
   /usr/lib/python3.7/site-packages/examples/__pycache__/__init__.cpython-37.pyc
   /usr/lib/python3.7/site-packages/examples/__pycache__/properties.cpython-37.opt-1.pyc
   /usr/lib/python3.7/site-packages/examples/__pycache__/properties.cpython-37.pyc
   /usr/lib/python3.7/site-packages/examples/__pycache__/shared_subscriptions.cpython-37.opt-1.pyc
   /usr/lib/python3.7/site-packages/examples/__pycache__/shared_subscriptions.cpython-37.pyc
   /usr/lib/python3.7/site-packages/examples/__pycache__/will_message.cpython-37.opt-1.pyc
   /usr/lib/python3.7/site-packages/examples/__pycache__/will_message.cpython-37.pyc
   /usr/lib/python3.7/site-packages/examples/properties.py
   /usr/lib/python3.7/site-packages/examples/shared_subscriptions.py
   /usr/lib/python3.7/site-packages/examples/will_message.py

From my point of view they should go under or not be included at all:

/usr/lib/python3.7/site-packages/gmatt/examples/

MQTT 5.0 support

We need MQTT 5.0 specification support, at least part of it. Let's make this library v5.0 compliant, here is the list of features we need in order of importance:

  • MQTT v5.0 CONNECT packet and custom properties per each packet

  • PUBLISH => set message expiry interval

Next are less important:

  • subscription identifier => client provides own index upon subscribe, broker specify subscription identifier that was the reason of message delivery

  • receive maximum => client specifies limit on quantity of inflight messages sent by broker

  • will delay interval

Client remains active after disconnect()

Hi!

My usecase is to use gmqtt with an MQTT broker over an unreliable link, so it can be unavailable for quite a bit or the MQTT TCP connection can be in weird and wonderful states. Furthermore, I need to completely shutdown and discard a gmqtt client and all its resources and tasks if it is unable to connect to such an MQTT broker.

One particular slightly strange connection state can be simulated with SSH port forwarding. On Linux and friends, the command ssh -L 2000:localhost:1 localhost opens local port 2000 and connects it to the closed port 1, simulating a case where you can connect to port 2000, but can't actually get data through.

I've been using code roughly similar to the simplified attachment test.py.txt to make this work with v0.4.5. The attached code produces the following output with v0.4.5:

INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
DEBUG:root:wait for connect() returned, done: 1, pending: 1
DEBUG:root:Client.connect() completed: 0
DEBUG:root:_DISC.wait() completed: 1
DEBUG:root:_DISC.is_set(): 1
DEBUG:root:Cancelling pending tasks
DEBUG:root:Cancelled pending tasks
DEBUG:root:Wait for disconnect()
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:disconnect() returned
DEBUG:root:waiting for 20 seconds
DEBUG:root:waited for 20 seconds; exiting _main()

So while client.connect() doesn't return, that's ok because I can see from the call to on_disconnect() that there is on connection. After calling client.disconnect() the client is no longer active, which is what I want.

With v0.5, the output is as follows:

INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
DEBUG:gmqtt.mqtt.protocol:[RECV EMPTY] Connection will be reset automatically.
INFO:gmqtt.mqtt.protocol:[CONN CLOSE NORMALLY]
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:wait for connect() returned, done: 1, pending: 1
DEBUG:root:Client.connect() completed: 0
DEBUG:root:_DISC.wait() completed: 1
DEBUG:root:_DISC.is_set(): 1
DEBUG:root:Cancelling pending tasks
DEBUG:root:Cancelled pending tasks
DEBUG:root:Wait for disconnect()
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:disconnect() returned
DEBUG:root:waiting for 20 seconds
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:waited for 20 seconds; exiting _main()
ERROR:asyncio:Exception in callback MqttPackageHandler._handle_exception_in_future(<Task cancell...lient.py:174>>)
handle: <Handle MqttPackageHandler._handle_exception_in_future(<Task cancell...lient.py:174>>)>
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/home/veefil/.local/lib/python3.7/site-packages/gmqtt/mqtt/handler.py", line 208, in _handle_exception_in_future
    if not future.exception():
concurrent.futures._base.CancelledError

This indicates that after having called client.disconnect() the gmqtt client still has some async call-backs going on and at least with the regular API, I couldn't figure out how to make those stop.

I'm not sure whether that behavior is intentional or whether I missed something, so I just wanted to let you know that it seems that v0.5.0 doesn't quite seem to serve the particular use case I'm after. Are there any plans to change this behavior again in the future?

Periodically disconnects with mosquitto 1.4.15

Hello!

I'm using your clean example from README and I have two problems:

  • Client periodically disconnects (around every 30 seconds)
  • Clean session does not work after reconnecting and I receive messages for old subscription which I made before with the same client ID.
Connected
[TRYING WRITE TO CLOSED SOCKET]
Disconnected
Connected
Disconnected
[TRYING WRITE TO CLOSED SOCKET]
Connected
[TRYING WRITE TO CLOSED SOCKET]
Disconnected
Connected
[TRYING WRITE TO CLOSED SOCKET]
Disconnected
Connected
[TRYING WRITE TO CLOSED SOCKET]
Disconnected
Connected
Disconnected

mosquitto: 1.4.15
gmqtt: 0.6.5

With mosquitto 1.6.9 all works fine.

handler.py: missing 1 required positional argument: 'properties'

Following the example on the README i have a connect function defined like this (I'm not using

import asyncio
import os
import signal
import time

from gmqtt import Client as MQTTClient
from gmqtt.mqtt.constants import MQTTv311

host = 'localhost'
port = '1993'

STOP = asyncio.Event()


def on_connect(client, flags, rc, properties):
    print('Connected')
    client.subscribe('#', qos=0)


def on_message(client, topic, payload, qos, properties):
    print('RECV MSG:', payload)


def on_disconnect(client, packet, exc=None):
    print('Disconnected')


def on_subscribe(client, mid, qos):
    print('SUBSCRIBED')


def ask_exit(*args):
    STOP.set()


async def main(broker_host, broker_port):
    client = MQTTClient("client-id")

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe

    await client.connect(broker_host, broker_port, version=MQTTv311)
    await STOP.wait()
    await client.disconnect()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    loop.add_signal_handler(signal.SIGINT, ask_exit)
    loop.add_signal_handler(signal.SIGTERM, ask_exit)

    loop.run_until_complete(main(host, port))

Traceback (most recent call last):
  File "/usr/lib/python3.7/site-packages/gmqtt/mqtt/handler.py", line 227, in __call__
    result = self._handle_packet(cmd, packet)
  File "/usr/lib/python3.7/site-packages/gmqtt/mqtt/handler.py", line 124, in _handle_packet
    handler(cmd, packet)
  File "/usr/lib/python3.7/site-packages/gmqtt/mqtt/handler.py", line 179, in _handle_connack_packet
    self.on_connect(self, flags, result)
TypeError: on_connect() missing 1 required positional argument: 'properties'

This will go away if I define the function as

def on_connect(client, flags, rc, properties=None):
    print('Connected')
    client.subscribe('#', qos=0)

How do I know if receive topic comes from the same subscribe?

I subscribe a topic with plus symbol (LIGHT/+/TURNON)
So I will receive these topics:

LIGHT/ID_1/TURNON
LIGHT/ID_2/TURNON

I hope to receive these topic can do the same thing.
How do I know that LIGHT/ID_1/TURNON and LIGHT/ID_2/TURNON are from the same subscribe?

following is my sample code:

TOPIC_LIST = []

def turn_on_tv(client, topic, message, qos, properties):
    # do something

def turn_on_light(client, topic, message, qos, properties):
    # do something

def init_subscribe(client):
    global TOPIC_LIST
    TOPIC_LIST = [
        {'topic': 'TV/TURNON'., 'qos': 1, 'method': turn_on_tv},
        {'topic': 'LIGHT/+/TURNON', 'qos': 1, 'method': turn_on_light},
    ]

    for t in TOPIC_LIST:
        client.subscribe(t['topic'], t['qos'])

def on_message(client, topic, payload, qos, properties):
    for t in TOPIC_LIST:
        if topic == t['topic']:    # How do I know?
            t['method'](client, topic, message, qos, properties)
            break

Reconnect Fail

My client disconnected from broker, but an error occurred while it try to reconnect . The client can't reconnect again but the event_loop still run. Maybe there should be an on_error() callback function set on the client.

I check the source code in `gmqtt.mqtt.handler'

    def _handle_disconnect_packet(self, cmd, packet):
        # reset server topics on disconnect
        self._clear_topics_aliases()

        future = asyncio.ensure_future(self.reconnect(delay=True))
        future.add_done_callback(self._handle_exception_in_future)
        self.on_disconnect(self, packet)

    def _handle_exception_in_future(self, future):
        if future.exception():
            logger.warning('[EXC OCCURED] in reconnect future %s', future.exception())
            return

If the reconnect future run fail, and the callback _handle_exception_in_future Just print a log info ......, Why not do a reconnection after some seconds?

About the on_disconnect() callback (In ‘README.md’)

def on_disconnect(client, packet, exc=None):
    print('Disconnected') 

I define the exe param, but the source code in gmqtt.mqtt.handler

    def _handle_disconnect_packet(self, cmd, packet):
        # reset server topics on disconnect
        self._clear_topics_aliases()

        future = asyncio.ensure_future(self.reconnect(delay=True))
        future.add_done_callback(self._handle_exception_in_future)
        self.on_disconnect(self, packet)

In Fact , self.on_disconnect(self, packet) did not pass an exe param, I want get more exception infomation in the callback() , andI really need it !!!

Connection error: [CONNACK] 0x1 Task exception was never retrieved

Hello,

I cannot get the examples to work.

I've stripped a publish example down to:

import asyncio

from gmqtt import Client as MQTTClient

TOPIC = 'testtopic/TOPIC'


def on_connect(client, flags, rc, properties):
    print('Connected')


async def main(broker_host):
    client = MQTTClient(None)
    client.on_connect = on_connect
    await client.connect(broker_host, keepalive=60)
    client.publish(TOPIC, 'Message payload')
    await client.disconnect()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    loop.run_until_complete(main("127.0.0.1"))

A broker is running on localhost.

Python version 3.5.2.

Garbage collection issue using python 3.8

Hi,

I've tried the getting started example.

Unfortunately it doesn't seem to work due to a garbage collection issue using python 3.8. It works fine on python 3.7 (3.7.5 to be exact).

Here is the traceback:

An open stream was garbage collected prior to establishing network connection; call "stream.close()" explicitly.
Exception in callback _SelectorSocketTransport._call_connection_lost(None)
handle: <Handle _SelectorSocketTransport._call_connection_lost(None)>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 958, in _call_connection_lost
    super()._call_connection_lost(exc)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 716, in _call_connection_lost
    self._protocol.connection_lost(exc)
  File "/usr/local/lib/python3.8/site-packages/gmqtt/mqtt/protocol.py", line 194, in connection_lost
    self._connection.put_package((MQTTCommands.DISCONNECT, b''))
AttributeError: 'NoneType' object has no attribute 'put_package'
Traceback (most recent call last):
  File "./gmqtt_test.py", line 62, in <module>
    loop.run_until_complete(main(host))
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
  File "./gmqtt_test.py", line 45, in main
    await client.connect(broker_host)
  File "/usr/local/lib/python3.8/site-packages/gmqtt/client.py", line 146, in connect
    await self._connection.auth(self._client_id, self._username, self._password, will_message=self._will_message,
  File "/usr/local/lib/python3.8/site-packages/gmqtt/mqtt/connection.py", line 50, in auth
    await self._protocol.send_auth_package(client_id, username, password, self._clean_session,
  File "/usr/local/lib/python3.8/site-packages/gmqtt/mqtt/protocol.py", line 87, in send_auth_package
    self.write_data(pkg)
  File "/usr/local/lib/python3.8/site-packages/gmqtt/mqtt/protocol.py", line 42, in write_data
    if not self._transport.is_closing():
AttributeError: 'NoneType' object has no attribute 'is_closing'

You can reproduce this by using the python 3.8-buster docker image:

host> docker run -it -v $SCRIPT_PATH:/gmqtt_getting_started.py python:3.8-buster /bin/bash
container> pip install gmqtt
container> /gmqtt_getting_started.py

Does anyone have an idea on how to fix this?

Thanks

error occurs when QoS is set to 2

I have two mqtt clients.
When the QoS of publish and subscribe is set to 2 at the same time, I get the following error.

[ERROR HANDLE PKG]
Traceback (most recent call last):
  File "/Users/jing/virtualenv/mqtt_test/lib/python3.8/site-packages/gmqtt/mqtt/handler.py", line 351, in __call__
    result = self._handle_packet(cmd, packet)
  File "/Users/jing/virtualenv/mqtt_test/lib/python3.8/site-packages/gmqtt/mqtt/handler.py", line 204, in _handle_packet
    handler(cmd, packet)
  File "/Users/jing/virtualenv/mqtt_test/lib/python3.8/site-packages/gmqtt/mqtt/handler.py", line 403, in _handle_pubrel_packet
    mid, = struct.unpack("!H", packet)
struct.error: unpack requires a buffer of 2 bytes

Why does this error occur?

Question:document for how to do pytest

Env:CentOS8/ mosquitto 1.6.9

[root@localhost gmqtt-0.6.4]#  /opt/python38/bin/pytest
===================================================================================== test session starts =====================================================================================
platform linux -- Python 3.8.1, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
rootdir: /root/libs/gmqtt-0.6.4, inifile: pytest.ini
plugins: hypothesis-5.6.0, cov-2.8.1, forked-1.1.3, xdist-1.31.0, asyncio-0.11.0
collected 16 items

tests/test_mqtt5.py EEEEEEFEEEEEEEEE                                                                                                                                                    [100%]

=========================================================================================== ERRORS ============================================================================================
________________________________________________________________________________ ERROR at setup of test_basic _________________________________________________________________________________

    @pytest.fixture()
    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
    self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():
            return

        try:
            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)

/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_________________________________________________________________________ ERROR at setup of test_basic_subscriptions __________________________________________________________________________

    @pytest.fixture()
    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
    self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():
            return

        try:
            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)

/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
___________________________________________________________________________ ERROR at setup of test_retained_message ___________________________________________________________________________

    @pytest.fixture()
    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
    self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():
            return

        try:
            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)

/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_____________________________________________________________________________ ERROR at setup of test_will_message _____________________________________________________________________________

    @pytest.fixture()
    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
    self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():
            return

        try:
            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)

/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_________________________________________________________________ ERROR at setup of test_no_will_message_on_gentle_disconnect _________________________________________________________________

    @pytest.fixture()
    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
    self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():
            return

        try:
            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)

/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_________________________________________________________________________ ERROR at setup of test_shared_subscriptions _________________________________________________________________________

    @pytest.fixture()
    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
    self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():
            return

        try:
            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)

/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_____________________________________________________________________________ ERROR at setup of test_unsubscribe ______________________________________________________________________________

    @pytest.fixture()
    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
    self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():
            return

        try:
            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)

/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
______________________________________________________________________ ERROR at setup of test_overlapping_subscriptions _______________________________________________________________________

    @pytest.fixture()
    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
    self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():
            return

        try:
            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)

/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_______________________________________________________________________ ERROR at setup of test_redelivery_on_reconnect ________________________________________________________________________

    @pytest.fixture()
    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
    self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():
            return

        try:
            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)

/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
___________________________________________________________________________ ERROR at setup of test_async_on_message ___________________________________________________________________________

    @pytest.fixture()
    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
    self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
    connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
    transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
    raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
    sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
    await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.done():
            return

        try:
            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)

/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
___________________________________________________________________________ ERROR at setup of test_request_response ___________________________________________________________________________

    @pytest.fixture()
    async def init_clients():
>       await cleanup(host, port, username, prefix=PREFIX)

tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
    await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
    self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection

Async-with API

Hi there,
Thanks for sharing this project, I'm very excited to see MQTT and asyncio coming together.

I was surprised by the callback-style API in the basic example. I expected the example to use the async-with pattern like the aiohttp or websockets clients. My recreation:

async with gmqtt.connect('mqtt.flespi.io') as client:
    print('Connected')
    await client.publish('TEST/TIME', str(time.time()), qos=1)
    
    async for message in client.subscribe('TEST/#', qos=0):
        print('RECV MSG:', message)
        break  # print one message only

Do you have any ideas how to implement this? (I'll be looking into it myself)

Malformed packet when subscribe non-ASCII topic like 'testtopic/时间'

Enviroment: Win10/python3.8/gmqtt 0.6.7

import asyncio
import os
import signal
import time

from gmqtt import Client as MQTTClient

STOP = asyncio.Event()


def on_connect(client, flags, rc, properties):
    print('Connected')
    client.subscribe('testtopic/时间', qos=0) #'testtopic/#' Works


def on_message(client, topic, payload, qos, properties):
    print('RECV MSG:', payload)


def on_disconnect(client, packet, exc=None):
    print('Disconnected')

def on_subscribe(client, mid, qos, properties):
    print('SUBSCRIBED')

def ask_exit(*args):
    STOP.set()

async def main(broker_host, port , token):
    client = MQTTClient("client-id")

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe

    client.set_auth_credentials(token, None)
    await client.connect(broker_host, port )

    client.publish('testtopic/时间',"**" +str(time.time()), qos=1)

    await STOP.wait()
    await client.disconnect()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    host = "broker.emqx.io"#'mqtt.flespi.io'
    token = os.environ.get('FLESPI_TOKEN')
    import uuid 
    loop.run_until_complete(main(host,port = 1883, token=str(uuid.uuid4()) ))

Client does not fallback to MQTT3 on rabbitmq

Hey,

From the readme I guessed that gmqtt should reconnect using protocol v3 if mqtt v5 is not supported on the broker side. I'm using rabbitmq which does not support v5. gmqtt got disconnected right after establishing the connected but never reconnected. When explicitly setting v3 it works smoothly. I don't think that's a bug but maybe the Readme should be modified to point out that automatic protocol version adaption is not always working.

Best regards

async handle task.cancel()

Hi,
in all of the example this library is used with an asyncio event() to handle the termination of the library.
How can I handle instead an asyncio coroutine?

I have multiple tasks that are running in parallel with asyncio.gather()

this is another one


async def mqtt_consumer(mqtt_queue):
    logger.info('Starting consumer')

    try:
        while True:
            logger.info('Waiting for message')
            msg = await mqtt_queue.get()
            print(f"Got {msg}")
    except asyncio.CancelledError:
        pass
    finally:
        logger.info('Exiting consumer')
try:
        loop.run_until_complete(tasks)
except KeyboardInterrupt as e:
     tasks.cancel()
     loop.run_forever()
finally:
     loop.close()
     logging.info("Successfully shutdown the services.")

How can I run gmqtt in a while True await?

Keepalive has 2 bugs that prevent it from working properly

At connection.py:32, the first time it triggers, the computed values is always equal to, not greater than self._keepalive on my system. It therefore skips the keepalive, eventually failing to a disconnect/reconnect.

if time.monotonic() - self._last_data_in > self._keepalive:

Changing the > to >= fixes it for me, making the first keepalive work.

I originally encountered this on a MQTT broker that wanted keepalives shorter than 60 seconds, and in the process of troubleshooting also discovered that you reset keepalive to a hardcoded 60 on a reconnect:

client.py:152
self._connection = await self._create_connection(self._host, self._port, clean_session=True, keepalive=60)

Maybe set a self._keepalive in the Client class with a default of 60 and override it from the connect procedure if a new value is provided?

How to detect that reconnect_retries has been exceeded?

When I set reconnect_retries to some value as documented in the README and I want my application to react to the fact that gmqtt has exceeded that number of retries and given up on reconnecting (e.g., to exit the application or to reconnect at a much later stage), how would I do that?

Use message callback on subscriptions rather than globally

Right now, message handlers are attached globally on the mqtt client: mqtt_client.on_message = on_message

Would it be possible to attach a message handler to a Subscription? I am thinking of something along the lines of:

hello_sub = mqtt_client.subscribe('hello', qos=0)
hello_sub.on_message = on_message

AttributeError: 'NoneType' object has no attribute 'encode'

Does anybody know what is this error!

/home/mohsen/PycharmProjects/gmqtt/venv/bin/python /home/mohsen/PycharmProjects/gmqtt/gmqtt.py
Traceback (most recent call last):
File "/home/mohsen/PycharmProjects/gmqtt/gmqtt.py", line 59, in
loop.run_until_complete(main(host, token))
File "uvloop/loop.pyx", line 1451, in uvloop.loop.Loop.run_until_complete
File "/home/mohsen/PycharmProjects/gmqtt/gmqtt.py", line 41, in main
client.set_auth_credentials(token, None)
File "/home/mohsen/PycharmProjects/gmqtt/gmqtt/client.py", line 116, in set_auth_credentials
self._username = username.encode()
AttributeError: 'NoneType' object has no attribute 'encode'
Task was destroyed but it is pending!
task: <Task pending coro=<Client._resend_qos_messages() running at /home/mohsen/PycharmProjects/gmqtt/gmqtt/client.py:83> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f41ed8fe918>()]>>

GMQTT with TLS: ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed

TLS option is not working as expected. I can only set ssl to True but I cannot provide a path to the cert file.
Mosquitto configuration is working ok with TLS both publisher and subscriber:

mosquitto_sub --cafile /etc/mosquitto/ca_certificates/ca.crt -h 37c16a79d00a -t 'test' -p 8883 -u report -P 'report'


mosquitto_pub --cafile /etc/mosquitto/ca_certificates/ca.crt -h 37c16a79d00a -t 'test' -m 'amessage' -p 8883 -u report -P 'report'


mosquitto -v -c /etc/mosquitto/mosquitto.conf

1575295867: New client connected from 172.17.0.3 as mosq-W7nvl4LtsfAVItCtHT (p2, c1, k60, u'report').
1575295867: Client mosq-W7nvl4LtsfAVItCtHT disconnected.
1575295870: New connection from 172.17.0.3 on port 8883.

If I try to apply same configuration for gmqtt I get the error on the title

# EXAMPLE
import asyncio

from gmqtt import Client


async def main():
    cli = Client(client_id='test',
                 will_message=None,
                 clean_session=True)
    cli.set_auth_credentials('report', password='report')
    await cli.connect(host='37c16a79d00a',
                      port=8883,
                      keepalive=True,
                      ssl=True)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

ERROR FILE

gmqtt/mqtt/connection.py

ERROR LINE

transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)

ERROR MESSAGE

ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain (_ssl.c:1076)

I would like to know how to implement TLS over gmqtt. Could you provide a quick example?
Thanks

Possibility of not acknowledging a published message

Based on #113 I suggest to have the ability to not acknowledge a message at all.
So it should be possible to "return None" in the on_message callback (without raising an exception).

Why? If I want to process a message, but some backend system that I need for processing is unavailable right now, I'd either have to store the message locally somewhere (and process it later) or let the broker know that I have not processed it at all.

Another partial solution would be to disconnect from the broker as long as the backend is unavailable, but still there is a time window where some messages might slip through.

message process task

Hi,

What is the proper way to use asyncio.create_task for running a message process loop? Something like:

while True:
   msg = await client.new_message()

  process(msg)

Thanks!!

Issues falling back to v311

I've not been able to track this down to anything concrete so I'm hoping someone with more experience of gmqtt can fill in the blanks.

My test environment is gmqtt with mosquitto 1.6.10

In circumstamces I haven't yet been able to reproduce in simple test code, the v5 connection appears to work fine, but a subsequent publish may fail, with Mosquitto logging "Unsupported property type: 44", causing Mosquitto to drop the connection. Other times an identical publish will succeed, and when I starting hacking the gmqtt to find out where the difference was, it was in package.py/PublishPacket where the failed publish had protocol.proto_ver == MQTTv50, where the successful publish had somewhere silently downgraded to MQTTv311.

I wasn't able to work out where the downgrade occurred; with logger set to DEBUG I didn't see anything to indicate gmqtt had failed to connect at v5 and reconnected at v3.1.1, and in any case the publish was being triggered indirectly by the on_connect event so any reconnect would have happened ahead of that anyway.

To the best I can tell, the publish worked when it was in an event handler such as on_connect, but failed when it was in a separate async task (running via asyncio.create_task which is added to the event loop after the connection succeeds).

For my purposes, now that I have got this far I can get my application working by setting version to MQTTv311 on connect (I don't need v5 functionality at this point). But if anyone can shed some light on where this downgrade is happening and why I'd be interested.

Asynchronous handlers

Is there any plan to switch to asynchronous handlers?

I'd like to use an asynchronous function to store messages to a database using asyncio:

async def on_message(client, topic, payload, qos, properties):
    pass

mqtt_client.on_message = on_message

Indeed it is still possible to add a new task to the event loop; I just thought it would be nicer.

Feature request: client config

I would like to provide a config to Client that contains FAILED_CONNECTIONS_STOP_RECONNECT, RECONNECTION_SLEEP, ...

An example from hbmqtt project:

config = {
    'keep_alive': 10,
    'ping_delay': 1,
    'default_qos': 0,
    'default_retain': False,
    'auto_reconnect': True,
    'reconnect_max_interval': 5,
    'reconnect_retries': 10,
    'topics': {
        '/test': { 'qos': 1 },
        '/some_topic': { 'qos': 2, 'retain': True }
    }
}

Connection lost on Linux server

Hi,

I am getting connect/disconnect events all the time on my Linux (debian) server. The same script works ok on my local machine (mac). The pattern is always the same, like:

CONNECTED->SUBSCRIBED->CONN CLOSE NORMALLY/DISONNECT after exactly 4 minutes->CONNECTED->CONN CLOSE NORMALLY/DISONNECT after exactly 2 minutes. Then it repeats every 2 minutes many times, then again 4 minutes/2 minutes.

I don't find anything in my server logs and there is only "503: mqtt session connection was closed (Unexpected disconnect)" message logged on flespi platform.

Any clue on what this could be? Probably some server/network related stuff but don't know how to identify it
I am attaching the screenshot from my terminal.

Thx on any help that could help me resolve this.

regards, dejan
Screenshot 2020-03-18 at 15 39 41

server keep live bug

when there is keeplive in connack, the exception:
result = self._handle_packet(cmd, packet)
File "E:\workspace\mqtt-simulator\env\lib\site-packages\gmqtt\mqtt\handler.py", line 214, in _handle_packet
handler(cmd, packet)
File "E:\workspace\mqtt-simulator\env\lib\site-packages\gmqtt\mqtt\handler.py", line 286, in _handle_connack_packet
File "E:\workspace\mqtt-simulator\env\lib\site-packages\gmqtt\mqtt\handler.py", line 257, in _update_keepalive_if_needed
self._connection.keepalive = self._keepalive
File "E:\workspace\mqtt-simulator\env\lib\site-packages\gmqtt\mqtt\connection.py", line 109, in keepalive
self._keep_connection_callback = asyncio.get_event_loop().call_later(self._keepalive / 2, self._keep_connection)
TypeError: unsupported operand type(s) for /: 'list' and 'int'

    def _parse_properties(self, packet):
        if self.protocol_version < MQTTv50:
            # If protocol is version is less than 5.0, there is no properties in packet
            return {}, packet
        properties_len, left_packet = unpack_variable_byte_integer(packet)
        packet = left_packet[:properties_len]
        left_packet = left_packet[properties_len:]
        properties_dict = defaultdict(list)
        while packet:
            property_identifier, = struct.unpack("!B", packet[:1])
            property_obj = Property.factory(id_=property_identifier)
            if property_obj is None:
                logger.critical('[PROPERTIES] received invalid property id {}, disconnecting'.format(property_identifier))
                return None, None
            result, packet = property_obj.loads(packet[1:])
            for k, v in result.items():
                properties_dict[k].append(v)   # this is list ,but need int
        properties_dict = dict(properties_dict)
        return properties_dict, left_packet

on_message callback is triggered when sending messages.

Hello,
Thank you for this great project. It's very nice. I noticed an issue. Please let me know if this is intentional or if there is bug somewhere.

Consider this code:

import asyncio
import os
import signal
import time
import ssl

from gmqtt import Client as MQTTClient


STOP = asyncio.Event()


def on_connect(client, flags, rc, properties):
    print('Connected')
    client.subscribe('TEST/#', qos=0)


def on_message(client, topic, payload, qos, properties):
    print('[RECV MSG {}] TOPIC: {} PAYLOAD: {} QOS: {} PROPERTIES: {}'.format(client._client_id, topic, payload, qos, properties))

def on_disconnect(client, packet, exc=None):
    print('Disconnected')

def on_subscribe(client, mid, qos, properties):
    print('SUBSCRIBED')

async def main(broker_host):
    client = MQTTClient("code_client-id")

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe

    ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
    ssl_context.verify_mode = ssl.CERT_NONE

    client.set_auth_credentials("usename", "password")
    await client.connect(broker_host,8883,ssl_context)

    client.publish('TEST/TIME', "message from python code", qos=1)

    await STOP.wait()
    await client.disconnect()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    host = 'mymqttbroker.com'
    loop.run_until_complete(main(host))

the above code provides the following output:

Connected
SUBSCRIBED
[RECV MSG code_client-id] TOPIC: TEST/TIME PAYLOAD: b'message from python code' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
[RECV MSG code_client-id] TOPIC: TEST/TIME PAYLOAD: b'message from other client' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}

As you can see the on_message callback was triggered both when sending and receiving a message.
Thus making it impossible to distinguish who sent the message (server or client?).

Thank you for your time.

Unhandled [WinError 10054]

Елена, извините, опишу на русском, ибо писать много, а мой английский совсем не вери-велл :(

Пришлось протестировать на богопротивной Windows и сразу столкнулся с проблемой.
Вкратце: обычное поведение клиента состоит в том, что при неудачном подключении к брокеру, выполняются попытки переподключения.

Я категорически несогласен с этим поведением и не совсем понимаю, как это поведение обойти.

Пытаюсь обойти следующим образом:

        try:
            await self._client.connect(self._host, self._port, self._ssl, version=MQTTv311)
        except:
            self._client.stop_reconnect()
            await self._client.disconnect()

И это даже работает - при неудачной попытке первичного подключения, клиент бросает дальнейшие попытки.

Но не на Windows...
В начале теста я забыл изменить значение параметра ssl метода connect - брокер требует безопасное подключение, но я передал значение False. В итоге наблюдаю следующую картину:

[CONNECTION MADE]
[EXC: CONN LOST]
================================================================================
Traceback (most recent call last):
  File "...Python\lib\asyncio\selector_events.py", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [WinError 10054] Удаленный хост принудительно разорвал существующее подключение
================================================================================
[CMD 0xe0] b''
[TRYING WRITE TO CLOSED SOCKET]

И так по кругу. Приведенный выше код try...except, который призван остановить попытки после первой же неудачной, в данном случае не срабатывает.
Посоветуйте, что можно сделать?

Вопрос, на самом деле, даже более широкий: каким образом я могу не дожидаться подключения к брокеру, если уже после первой попытки понятно, что продолжать бесполезно - ошибочные host и порт, некорректные логин и пароль и т.п.?
В то же время, если возникла необходимость переподключения в процессе работы (например, на какое-то время пропало соединение), то продолжать попытки нужно практически до победы.

CI is not running for PRs

The CI is not running on PRs. It seems that the credentials are not valid which are used by travis.

        if raise_exc and self._error:
>           raise self._error
E           gmqtt.mqtt.handler.MQTTConnectError: code 134 (Connection Refused: Bad User Name or Password)

Topic Alias support?

First of all, congrats to the developers on this great project, gmqtt is a joy a to use. Thanks!

I had a quick look and it seems that Topic Alias are not supported by gmqtt. Is that the case? I will happily send a PR improving the docs if they are already supported, just couldn't see how.

Unhandled ConnectionRefusedError

If you run properties example with host that is not running broker (eg localhost), program will crash

Traceback (most recent call last):
  File "properties.py", line 98, in <module>
    loop.run_until_complete(main('localhost', port, None))
  File "/usr/lib64/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "properties.py", line 51, in main
    await sub_client.connect(broker_host, broker_port)
  File "/home/d21d3q/workspace/gmqtt/venv/lib/python3.7/site-packages/gmqtt-0.4.0-py3.7.egg/gmqtt/client.py", line 144, in connect
  File "/home/d21d3q/workspace/gmqtt/venv/lib/python3.7/site-packages/gmqtt-0.4.0-py3.7.egg/gmqtt/client.py", line 160, in _create_connection
  File "/home/d21d3q/workspace/gmqtt/venv/lib/python3.7/site-packages/gmqtt-0.4.0-py3.7.egg/gmqtt/mqtt/connection.py", line 26, in create_connection
  File "/usr/lib64/python3.7/asyncio/base_events.py", line 954, in create_connection
    raise exceptions[0]
  File "/usr/lib64/python3.7/asyncio/base_events.py", line 941, in create_connection
    await self.sock_connect(sock, address)
  File "/usr/lib64/python3.7/asyncio/selector_events.py", line 464, in sock_connect
    return await fut
  File "/usr/lib64/python3.7/asyncio/selector_events.py", line 494, in _sock_connect_cb
    raise OSError(err, f'Connect call failed {address}')
ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending coro=<Client._resend_qos_messages() running at /home/d21d3q/workspace/gmqtt/venv/lib/python3.7/site-packages/gmqtt-0.4.0-py3.7.egg/gmqtt/client.py:96> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f238afa0f10>()]>>

Expected behavior would be to try to reconnect normally.

workaround:

    try:
        await pub_client.connect(broker_host, broker_port)
    except ConnectionRefusedError:
        asyncio.ensure_future(pub_client.reconnect(delay=True))

[Feature request] Wait for all pending messages to be published

Please add a way to await for all pending messages to be delivered.

Calling await client.disconnect() is throwing a bunch of errors:

Task was destroyed but it is pending!
task: <Task cancelling name='Task-3' coro=<MQTTProtocol._read_loop() running at /usr/lib/python3.8/site-packages/gmqtt/mqtt/protocol.py:185> wait_for=<Future finished result=None>>
Task was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<Client.reconnect() running at /usr/lib/python3.8/site-packages/gmqtt/client.py:191> cb=[MqttPackageHandler._handle_exception_in_future()]>
/usr/lib/python3.8/asyncio/base_events.py:637: RuntimeWarning: coroutine 'Client.reconnect' was never awaited
  self._ready.clear()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

restore subscriptions after connection lost

Hi,
I'm facing an issue when restoring subscriptions after a lost connection. I'm testing durability of connections and whenever I shut the connection down (turning the access point off) gmqtt reconnects and delivers messages but the subscriptions remain dead.

I see two of the following log messages right after each other:

INFO:mqtt.MQTTClient:MQTT connected
INFO:gmqtt.mqtt.protocol:[CONN CLOSE NORMALLY]
WARNING:gmqtt.mqtt.handler:[EXC OCCURED] in reconnect future None

A little later:

INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.handler:[EXC OCCURED] in reconnect future None

Any idea on how to either verify subscriptions and resubscribe in case a subscription is lost or how to make subscriptions more robust?

Client does not reconnect

Hi,

I'm using version 0.5.6 and got the following code:

#!/usr/bin/env python3

import asyncio
from gmqtt import Client as MQTTClient
from gmqtt.mqtt.constants import UNLIMITED_RECONNECTS
import logging

logging.basicConfig(level=logging.DEBUG)

async def main():
    mqtt = MQTTClient('tester')

    mqtt.set_config({
        'reconnect_retries': UNLIMITED_RECONNECTS,
        'reconnect_delay': 1
    })

    await mqtt.connect('localhost')

    while True:
        #if mqtt.is_connected:
        mqtt.publish('ohlc_1m', 'check')
        await asyncio.sleep(1)

def handle_exception(loop, context):
    # context["message"] will always be there; but context["exception"] may not
    msg = context.get("exception", context["message"])
    logging.error(f"Caught exception: {str(msg)}")

loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
loop.run_until_complete(main())

To test the reconnection I'm restarting mosquitto service and get the following output:

DEBUG:asyncio:Using selector: EpollSelector
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
DEBUG:gmqtt.mqtt.handler:[CMD 0x20] b'\x00\x00\x03"\x00\n'
DEBUG:gmqtt.mqtt.handler:[CONNACK] flags: 0x0, result: 0x0
DEBUG:gmqtt.client:[QoS query IS EMPTY]
DEBUG:gmqtt.mqtt.package:Sending PUBLISH (q0), 'ohlc_1m', ... (5 bytes)
DEBUG:gmqtt.mqtt.package:Sending PUBLISH (q0), 'ohlc_1m', ... (5 bytes)
DEBUG:gmqtt.mqtt.protocol:[RECV EMPTY] Connection will be reset automatically.
INFO:gmqtt.mqtt.protocol:[CONN CLOSE NORMALLY]
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:gmqtt.mqtt.package:Sending PUBLISH (q0), 'ohlc_1m', ... (5 bytes)
Traceback (most recent call last):
  File "test.py", line 32, in <module>
    loop.run_until_complete(main())
  File "/usr/lib/python3.8/asyncio/base_events.py", line 612, in run_until_complete
    return future.result()
  File "test.py", line 22, in main
    mqtt.publish('ohlc_1m', 'check')
  File "/usr/lib/python3.8/site-packages/gmqtt/client.py", line 228, in publish
    mid, package = self._connection.publish(message)
  File "/usr/lib/python3.8/site-packages/gmqtt/mqtt/connection.py", line 54, in publish
    return self._protocol.send_publish(message)
  File "/usr/lib/python3.8/site-packages/gmqtt/mqtt/protocol.py", line 111, in send_publish
    self.write_data(pkg)
  File "/usr/lib/python3.8/site-packages/gmqtt/mqtt/protocol.py", line 45, in write_data
    if not self._transport.is_closing():
AttributeError: 'NoneType' object has no attribute 'is_closing'

I've seen a similar error in #74 so reporting this one for further investigation.

Last Will (graceful disconnect) bug

Sample:

import asyncio

from gmqtt import Client, Message
from gmqtt.mqtt.constants import MQTTv311

import logging

STOP = asyncio.Event()


def on_connect(client, flags, rc, properties):
    print('CONNECTED')
    client.subscribe('test/#', qos=0)


def on_message(client, topic, payload, qos, properties):
    print('RECV MSG:', payload)


def on_disconnect(client, packet, exc=None):
    print('DISCONNECTED')


def on_subscribe(client, mid, qos):
    print('SUBSCRIBED')


async def main():
    client = Client("client-id", will_message=Message('test', "LOST", qos=1, retain=True))

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe

    client.set_auth_credentials('username', 'password')
    await client.connect('127.0.0.1', 8883, keepalive=60, version=MQTTv311)

    client.publish('test', 'ONLINE', qos=1, retain=True)
    client.publish('test', 'OFFLINE', qos=1, retain=True)
    await client.disconnect() 

if __name__ == '__main__':
    asyncio.run(main())

Broker log:

test ONLINE
test OFFLINE
test LOST

Why LWT was sent?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.