wialon / gmqtt Goto Github PK
View Code? Open in Web Editor NEWPython MQTT v5.0 async client
License: MIT License
Python MQTT v5.0 async client
License: MIT License
If you run properties example with host that is not running broker (eg localhost), program will crash
Traceback (most recent call last):
File "properties.py", line 98, in <module>
loop.run_until_complete(main('localhost', port, None))
File "/usr/lib64/python3.7/asyncio/base_events.py", line 579, in run_until_complete
return future.result()
File "properties.py", line 51, in main
await sub_client.connect(broker_host, broker_port)
File "/home/d21d3q/workspace/gmqtt/venv/lib/python3.7/site-packages/gmqtt-0.4.0-py3.7.egg/gmqtt/client.py", line 144, in connect
File "/home/d21d3q/workspace/gmqtt/venv/lib/python3.7/site-packages/gmqtt-0.4.0-py3.7.egg/gmqtt/client.py", line 160, in _create_connection
File "/home/d21d3q/workspace/gmqtt/venv/lib/python3.7/site-packages/gmqtt-0.4.0-py3.7.egg/gmqtt/mqtt/connection.py", line 26, in create_connection
File "/usr/lib64/python3.7/asyncio/base_events.py", line 954, in create_connection
raise exceptions[0]
File "/usr/lib64/python3.7/asyncio/base_events.py", line 941, in create_connection
await self.sock_connect(sock, address)
File "/usr/lib64/python3.7/asyncio/selector_events.py", line 464, in sock_connect
return await fut
File "/usr/lib64/python3.7/asyncio/selector_events.py", line 494, in _sock_connect_cb
raise OSError(err, f'Connect call failed {address}')
ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending coro=<Client._resend_qos_messages() running at /home/d21d3q/workspace/gmqtt/venv/lib/python3.7/site-packages/gmqtt-0.4.0-py3.7.egg/gmqtt/client.py:96> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f238afa0f10>()]>>
Expected behavior would be to try to reconnect normally.
workaround:
try:
await pub_client.connect(broker_host, broker_port)
except ConnectionRefusedError:
asyncio.ensure_future(pub_client.reconnect(delay=True))
Hi,
What is the proper way to use asyncio.create_task
for running a message process loop? Something like:
while True:
msg = await client.new_message()
process(msg)
Thanks!!
I subscribe a topic with plus symbol (LIGHT/+/TURNON)
So I will receive these topics:
LIGHT/ID_1/TURNON
LIGHT/ID_2/TURNON
I hope to receive these topic can do the same thing.
How do I know that LIGHT/ID_1/TURNON
and LIGHT/ID_2/TURNON
are from the same subscribe?
following is my sample code:
TOPIC_LIST = []
def turn_on_tv(client, topic, message, qos, properties):
# do something
def turn_on_light(client, topic, message, qos, properties):
# do something
def init_subscribe(client):
global TOPIC_LIST
TOPIC_LIST = [
{'topic': 'TV/TURNON'., 'qos': 1, 'method': turn_on_tv},
{'topic': 'LIGHT/+/TURNON', 'qos': 1, 'method': turn_on_light},
]
for t in TOPIC_LIST:
client.subscribe(t['topic'], t['qos'])
def on_message(client, topic, payload, qos, properties):
for t in TOPIC_LIST:
if topic == t['topic']: # How do I know?
t['method'](client, topic, message, qos, properties)
break
Examples are placed in the root of the site-packages directory which will make it hard to identify them (is also a packaging issue).
Installed (but unpackaged) file(s) found:
/usr/lib/python3.7/site-packages/examples/__init__.py
/usr/lib/python3.7/site-packages/examples/__pycache__/__init__.cpython-37.opt-1.pyc
/usr/lib/python3.7/site-packages/examples/__pycache__/__init__.cpython-37.pyc
/usr/lib/python3.7/site-packages/examples/__pycache__/properties.cpython-37.opt-1.pyc
/usr/lib/python3.7/site-packages/examples/__pycache__/properties.cpython-37.pyc
/usr/lib/python3.7/site-packages/examples/__pycache__/shared_subscriptions.cpython-37.opt-1.pyc
/usr/lib/python3.7/site-packages/examples/__pycache__/shared_subscriptions.cpython-37.pyc
/usr/lib/python3.7/site-packages/examples/__pycache__/will_message.cpython-37.opt-1.pyc
/usr/lib/python3.7/site-packages/examples/__pycache__/will_message.cpython-37.pyc
/usr/lib/python3.7/site-packages/examples/properties.py
/usr/lib/python3.7/site-packages/examples/shared_subscriptions.py
/usr/lib/python3.7/site-packages/examples/will_message.py
From my point of view they should go under or not be included at all:
/usr/lib/python3.7/site-packages/gmatt/examples/
As the title says, I want to know that publish runs successfully or failed when the qos is set to 1 or 2, if successfully, record it, if failed, retry it manually. So what should I do please?
when there is keeplive in connack, the exception:
result = self._handle_packet(cmd, packet)
File "E:\workspace\mqtt-simulator\env\lib\site-packages\gmqtt\mqtt\handler.py", line 214, in _handle_packet
handler(cmd, packet)
File "E:\workspace\mqtt-simulator\env\lib\site-packages\gmqtt\mqtt\handler.py", line 286, in _handle_connack_packet
File "E:\workspace\mqtt-simulator\env\lib\site-packages\gmqtt\mqtt\handler.py", line 257, in _update_keepalive_if_needed
self._connection.keepalive = self._keepalive
File "E:\workspace\mqtt-simulator\env\lib\site-packages\gmqtt\mqtt\connection.py", line 109, in keepalive
self._keep_connection_callback = asyncio.get_event_loop().call_later(self._keepalive / 2, self._keep_connection)
TypeError: unsupported operand type(s) for /: 'list' and 'int'
def _parse_properties(self, packet):
if self.protocol_version < MQTTv50:
# If protocol is version is less than 5.0, there is no properties in packet
return {}, packet
properties_len, left_packet = unpack_variable_byte_integer(packet)
packet = left_packet[:properties_len]
left_packet = left_packet[properties_len:]
properties_dict = defaultdict(list)
while packet:
property_identifier, = struct.unpack("!B", packet[:1])
property_obj = Property.factory(id_=property_identifier)
if property_obj is None:
logger.critical('[PROPERTIES] received invalid property id {}, disconnecting'.format(property_identifier))
return None, None
result, packet = property_obj.loads(packet[1:])
for k, v in result.items():
properties_dict[k].append(v) # this is list ,but need int
properties_dict = dict(properties_dict)
return properties_dict, left_packet
Following the example on the README i have a connect function defined like this (I'm not using
import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient
from gmqtt.mqtt.constants import MQTTv311
host = 'localhost'
port = '1993'
STOP = asyncio.Event()
def on_connect(client, flags, rc, properties):
print('Connected')
client.subscribe('#', qos=0)
def on_message(client, topic, payload, qos, properties):
print('RECV MSG:', payload)
def on_disconnect(client, packet, exc=None):
print('Disconnected')
def on_subscribe(client, mid, qos):
print('SUBSCRIBED')
def ask_exit(*args):
STOP.set()
async def main(broker_host, broker_port):
client = MQTTClient("client-id")
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.on_subscribe = on_subscribe
await client.connect(broker_host, broker_port, version=MQTTv311)
await STOP.wait()
await client.disconnect()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, ask_exit)
loop.add_signal_handler(signal.SIGTERM, ask_exit)
loop.run_until_complete(main(host, port))
Traceback (most recent call last):
File "/usr/lib/python3.7/site-packages/gmqtt/mqtt/handler.py", line 227, in __call__
result = self._handle_packet(cmd, packet)
File "/usr/lib/python3.7/site-packages/gmqtt/mqtt/handler.py", line 124, in _handle_packet
handler(cmd, packet)
File "/usr/lib/python3.7/site-packages/gmqtt/mqtt/handler.py", line 179, in _handle_connack_packet
self.on_connect(self, flags, result)
TypeError: on_connect() missing 1 required positional argument: 'properties'
This will go away if I define the function as
def on_connect(client, flags, rc, properties=None):
print('Connected')
client.subscribe('#', qos=0)
If you set optimistic_acknowledgement = False, how would you acknowledge received messages manually in the on_message handler?
I would like to provide a config to Client that contains FAILED_CONNECTIONS_STOP_RECONNECT, RECONNECTION_SLEEP, ...
An example from hbmqtt project:
config = {
'keep_alive': 10,
'ping_delay': 1,
'default_qos': 0,
'default_retain': False,
'auto_reconnect': True,
'reconnect_max_interval': 5,
'reconnect_retries': 10,
'topics': {
'/test': { 'qos': 1 },
'/some_topic': { 'qos': 2, 'retain': True }
}
}
Env:CentOS8/ mosquitto 1.6.9
[root@localhost gmqtt-0.6.4]# /opt/python38/bin/pytest
===================================================================================== test session starts =====================================================================================
platform linux -- Python 3.8.1, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
rootdir: /root/libs/gmqtt-0.6.4, inifile: pytest.ini
plugins: hypothesis-5.6.0, cov-2.8.1, forked-1.1.3, xdist-1.31.0, asyncio-0.11.0
collected 16 items
tests/test_mqtt5.py EEEEEEFEEEEEEEEE [100%]
=========================================================================================== ERRORS ============================================================================================
________________________________________________________________________________ ERROR at setup of test_basic _________________________________________________________________________________
@pytest.fixture()
async def init_clients():
> await cleanup(host, port, username, prefix=PREFIX)
tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)
def _sock_connect_cb(self, fut, sock, address):
if fut.done():
return
try:
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# Jump to any except clause below.
> raise OSError(err, f'Connect call failed {address}')
E ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)
/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_________________________________________________________________________ ERROR at setup of test_basic_subscriptions __________________________________________________________________________
@pytest.fixture()
async def init_clients():
> await cleanup(host, port, username, prefix=PREFIX)
tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)
def _sock_connect_cb(self, fut, sock, address):
if fut.done():
return
try:
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# Jump to any except clause below.
> raise OSError(err, f'Connect call failed {address}')
E ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)
/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
___________________________________________________________________________ ERROR at setup of test_retained_message ___________________________________________________________________________
@pytest.fixture()
async def init_clients():
> await cleanup(host, port, username, prefix=PREFIX)
tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)
def _sock_connect_cb(self, fut, sock, address):
if fut.done():
return
try:
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# Jump to any except clause below.
> raise OSError(err, f'Connect call failed {address}')
E ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)
/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_____________________________________________________________________________ ERROR at setup of test_will_message _____________________________________________________________________________
@pytest.fixture()
async def init_clients():
> await cleanup(host, port, username, prefix=PREFIX)
tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)
def _sock_connect_cb(self, fut, sock, address):
if fut.done():
return
try:
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# Jump to any except clause below.
> raise OSError(err, f'Connect call failed {address}')
E ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)
/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_________________________________________________________________ ERROR at setup of test_no_will_message_on_gentle_disconnect _________________________________________________________________
@pytest.fixture()
async def init_clients():
> await cleanup(host, port, username, prefix=PREFIX)
tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)
def _sock_connect_cb(self, fut, sock, address):
if fut.done():
return
try:
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# Jump to any except clause below.
> raise OSError(err, f'Connect call failed {address}')
E ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)
/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_________________________________________________________________________ ERROR at setup of test_shared_subscriptions _________________________________________________________________________
@pytest.fixture()
async def init_clients():
> await cleanup(host, port, username, prefix=PREFIX)
tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)
def _sock_connect_cb(self, fut, sock, address):
if fut.done():
return
try:
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# Jump to any except clause below.
> raise OSError(err, f'Connect call failed {address}')
E ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)
/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_____________________________________________________________________________ ERROR at setup of test_unsubscribe ______________________________________________________________________________
@pytest.fixture()
async def init_clients():
> await cleanup(host, port, username, prefix=PREFIX)
tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)
def _sock_connect_cb(self, fut, sock, address):
if fut.done():
return
try:
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# Jump to any except clause below.
> raise OSError(err, f'Connect call failed {address}')
E ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)
/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
______________________________________________________________________ ERROR at setup of test_overlapping_subscriptions _______________________________________________________________________
@pytest.fixture()
async def init_clients():
> await cleanup(host, port, username, prefix=PREFIX)
tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)
def _sock_connect_cb(self, fut, sock, address):
if fut.done():
return
try:
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# Jump to any except clause below.
> raise OSError(err, f'Connect call failed {address}')
E ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)
/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
_______________________________________________________________________ ERROR at setup of test_redelivery_on_reconnect ________________________________________________________________________
@pytest.fixture()
async def init_clients():
> await cleanup(host, port, username, prefix=PREFIX)
tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)
def _sock_connect_cb(self, fut, sock, address):
if fut.done():
return
try:
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# Jump to any except clause below.
> raise OSError(err, f'Connect call failed {address}')
E ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)
/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
___________________________________________________________________________ ERROR at setup of test_async_on_message ___________________________________________________________________________
@pytest.fixture()
async def init_clients():
> await cleanup(host, port, username, prefix=PREFIX)
tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
gmqtt/mqtt/connection.py:27: in create_connection
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
/opt/python38/lib/python3.8/asyncio/base_events.py:1021: in create_connection
raise exceptions[0]
/opt/python38/lib/python3.8/asyncio/base_events.py:1006: in create_connection
sock = await self._connect_sock(
/opt/python38/lib/python3.8/asyncio/base_events.py:920: in _connect_sock
await self.sock_connect(sock, address)
/opt/python38/lib/python3.8/asyncio/selector_events.py:494: in sock_connect
return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>, fut = <Future finished exception=ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 1883)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>, address = ('127.0.0.1', 1883)
def _sock_connect_cb(self, fut, sock, address):
if fut.done():
return
try:
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# Jump to any except clause below.
> raise OSError(err, f'Connect call failed {address}')
E ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 1883)
/opt/python38/lib/python3.8/asyncio/selector_events.py:526: ConnectionRefusedError
------------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------------
clean up starting
___________________________________________________________________________ ERROR at setup of test_request_response ___________________________________________________________________________
@pytest.fixture()
async def init_clients():
> await cleanup(host, port, username, prefix=PREFIX)
tests/test_mqtt5.py:30:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/utils.py:76: in cleanup
await curclient.connect(host=host, port=port)
gmqtt/client.py:160: in connect
self._connection = await self._create_connection(
gmqtt/client.py:177: in _create_connection
Right now, message handlers are attached globally on the mqtt client: mqtt_client.on_message = on_message
Would it be possible to attach a message handler to a Subscription? I am thinking of something along the lines of:
hello_sub = mqtt_client.subscribe('hello', qos=0)
hello_sub.on_message = on_message
Hello,
Thank you for this great project. It's very nice. I noticed an issue. Please let me know if this is intentional or if there is bug somewhere.
Consider this code:
import asyncio
import os
import signal
import time
import ssl
from gmqtt import Client as MQTTClient
STOP = asyncio.Event()
def on_connect(client, flags, rc, properties):
print('Connected')
client.subscribe('TEST/#', qos=0)
def on_message(client, topic, payload, qos, properties):
print('[RECV MSG {}] TOPIC: {} PAYLOAD: {} QOS: {} PROPERTIES: {}'.format(client._client_id, topic, payload, qos, properties))
def on_disconnect(client, packet, exc=None):
print('Disconnected')
def on_subscribe(client, mid, qos, properties):
print('SUBSCRIBED')
async def main(broker_host):
client = MQTTClient("code_client-id")
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.on_subscribe = on_subscribe
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.verify_mode = ssl.CERT_NONE
client.set_auth_credentials("usename", "password")
await client.connect(broker_host,8883,ssl_context)
client.publish('TEST/TIME', "message from python code", qos=1)
await STOP.wait()
await client.disconnect()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
host = 'mymqttbroker.com'
loop.run_until_complete(main(host))
the above code provides the following output:
Connected
SUBSCRIBED
[RECV MSG code_client-id] TOPIC: TEST/TIME PAYLOAD: b'message from python code' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
[RECV MSG code_client-id] TOPIC: TEST/TIME PAYLOAD: b'message from other client' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
As you can see the on_message callback was triggered both when sending and receiving a message.
Thus making it impossible to distinguish who sent the message (server or client?).
Thank you for your time.
When I set reconnect_retries
to some value as documented in the README and I want my application to react to the fact that gmqtt has exceeded that number of retries and given up on reconnecting (e.g., to exit the application or to reconnect at a much later stage), how would I do that?
Hi,
in all of the example this library is used with an asyncio event() to handle the termination of the library.
How can I handle instead an asyncio coroutine?
I have multiple tasks that are running in parallel with asyncio.gather()
this is another one
async def mqtt_consumer(mqtt_queue):
logger.info('Starting consumer')
try:
while True:
logger.info('Waiting for message')
msg = await mqtt_queue.get()
print(f"Got {msg}")
except asyncio.CancelledError:
pass
finally:
logger.info('Exiting consumer')
try:
loop.run_until_complete(tasks)
except KeyboardInterrupt as e:
tasks.cancel()
loop.run_forever()
finally:
loop.close()
logging.info("Successfully shutdown the services.")
How can I run gmqtt in a while True await?
Just putting this warning here
::test_admin_subscription
c:\python38-32\lib\site-packages\gmqtt\mqtt\protocol.py:19: DeprecationWarning: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.
self._connected = asyncio.Event(loop=loop)
-- Docs: https://docs.pytest.org/en/latest/warnings.html
At connection.py:32, the first time it triggers, the computed values is always equal to, not greater than self._keepalive on my system. It therefore skips the keepalive, eventually failing to a disconnect/reconnect.
if time.monotonic() - self._last_data_in > self._keepalive:
Changing the > to >= fixes it for me, making the first keepalive work.
I originally encountered this on a MQTT broker that wanted keepalives shorter than 60 seconds, and in the process of troubleshooting also discovered that you reset keepalive to a hardcoded 60 on a reconnect:
client.py:152
self._connection = await self._create_connection(self._host, self._port, clean_session=True, keepalive=60)
Maybe set a self._keepalive in the Client class with a default of 60 and override it from the connect procedure if a new value is provided?
Hi,
I am getting connect/disconnect events all the time on my Linux (debian) server. The same script works ok on my local machine (mac). The pattern is always the same, like:
CONNECTED->SUBSCRIBED->CONN CLOSE NORMALLY/DISONNECT after exactly 4 minutes->CONNECTED->CONN CLOSE NORMALLY/DISONNECT after exactly 2 minutes. Then it repeats every 2 minutes many times, then again 4 minutes/2 minutes.
I don't find anything in my server logs and there is only "503: mqtt session connection was closed (Unexpected disconnect)" message logged on flespi platform.
Any clue on what this could be? Probably some server/network related stuff but don't know how to identify it
I am attaching the screenshot from my terminal.
Thx on any help that could help me resolve this.
Hello,
I cannot get the examples to work.
I've stripped a publish example down to:
import asyncio
from gmqtt import Client as MQTTClient
TOPIC = 'testtopic/TOPIC'
def on_connect(client, flags, rc, properties):
print('Connected')
async def main(broker_host):
client = MQTTClient(None)
client.on_connect = on_connect
await client.connect(broker_host, keepalive=60)
client.publish(TOPIC, 'Message payload')
await client.disconnect()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main("127.0.0.1"))
A broker is running on localhost.
Python version 3.5.2.
Enviroment: Win10/python3.8/gmqtt 0.6.7
import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient
STOP = asyncio.Event()
def on_connect(client, flags, rc, properties):
print('Connected')
client.subscribe('testtopic/时间', qos=0) #'testtopic/#' Works
def on_message(client, topic, payload, qos, properties):
print('RECV MSG:', payload)
def on_disconnect(client, packet, exc=None):
print('Disconnected')
def on_subscribe(client, mid, qos, properties):
print('SUBSCRIBED')
def ask_exit(*args):
STOP.set()
async def main(broker_host, port , token):
client = MQTTClient("client-id")
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.on_subscribe = on_subscribe
client.set_auth_credentials(token, None)
await client.connect(broker_host, port )
client.publish('testtopic/时间',"**" +str(time.time()), qos=1)
await STOP.wait()
await client.disconnect()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
host = "broker.emqx.io"#'mqtt.flespi.io'
token = os.environ.get('FLESPI_TOKEN')
import uuid
loop.run_until_complete(main(host,port = 1883, token=str(uuid.uuid4()) ))
Hi!
My usecase is to use gmqtt with an MQTT broker over an unreliable link, so it can be unavailable for quite a bit or the MQTT TCP connection can be in weird and wonderful states. Furthermore, I need to completely shutdown and discard a gmqtt client and all its resources and tasks if it is unable to connect to such an MQTT broker.
One particular slightly strange connection state can be simulated with SSH port forwarding. On Linux and friends, the command ssh -L 2000:localhost:1 localhost
opens local port 2000 and connects it to the closed port 1, simulating a case where you can connect to port 2000, but can't actually get data through.
I've been using code roughly similar to the simplified attachment test.py.txt to make this work with v0.4.5. The attached code produces the following output with v0.4.5:
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
File "/opt/Python-3.7/lib/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
DEBUG:root:wait for connect() returned, done: 1, pending: 1
DEBUG:root:Client.connect() completed: 0
DEBUG:root:_DISC.wait() completed: 1
DEBUG:root:_DISC.is_set(): 1
DEBUG:root:Cancelling pending tasks
DEBUG:root:Cancelled pending tasks
DEBUG:root:Wait for disconnect()
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:disconnect() returned
DEBUG:root:waiting for 20 seconds
DEBUG:root:waited for 20 seconds; exiting _main()
So while client.connect()
doesn't return, that's ok because I can see from the call to on_disconnect()
that there is on connection. After calling client.disconnect()
the client is no longer active, which is what I want.
With v0.5, the output is as follows:
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
DEBUG:gmqtt.mqtt.protocol:[RECV EMPTY] Connection will be reset automatically.
INFO:gmqtt.mqtt.protocol:[CONN CLOSE NORMALLY]
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:wait for connect() returned, done: 1, pending: 1
DEBUG:root:Client.connect() completed: 0
DEBUG:root:_DISC.wait() completed: 1
DEBUG:root:_DISC.is_set(): 1
DEBUG:root:Cancelling pending tasks
DEBUG:root:Cancelled pending tasks
DEBUG:root:Wait for disconnect()
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:disconnect() returned
DEBUG:root:waiting for 20 seconds
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
File "/opt/Python-3.7/lib/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
File "/opt/Python-3.7/lib/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
File "/opt/Python-3.7/lib/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:waited for 20 seconds; exiting _main()
ERROR:asyncio:Exception in callback MqttPackageHandler._handle_exception_in_future(<Task cancell...lient.py:174>>)
handle: <Handle MqttPackageHandler._handle_exception_in_future(<Task cancell...lient.py:174>>)>
Traceback (most recent call last):
File "/opt/Python-3.7/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "/home/veefil/.local/lib/python3.7/site-packages/gmqtt/mqtt/handler.py", line 208, in _handle_exception_in_future
if not future.exception():
concurrent.futures._base.CancelledError
This indicates that after having called client.disconnect()
the gmqtt client still has some async call-backs going on and at least with the regular API, I couldn't figure out how to make those stop.
I'm not sure whether that behavior is intentional or whether I missed something, so I just wanted to let you know that it seems that v0.5.0 doesn't quite seem to serve the particular use case I'm after. Are there any plans to change this behavior again in the future?
Hi,
I'm using version 0.5.6 and got the following code:
#!/usr/bin/env python3
import asyncio
from gmqtt import Client as MQTTClient
from gmqtt.mqtt.constants import UNLIMITED_RECONNECTS
import logging
logging.basicConfig(level=logging.DEBUG)
async def main():
mqtt = MQTTClient('tester')
mqtt.set_config({
'reconnect_retries': UNLIMITED_RECONNECTS,
'reconnect_delay': 1
})
await mqtt.connect('localhost')
while True:
#if mqtt.is_connected:
mqtt.publish('ohlc_1m', 'check')
await asyncio.sleep(1)
def handle_exception(loop, context):
# context["message"] will always be there; but context["exception"] may not
msg = context.get("exception", context["message"])
logging.error(f"Caught exception: {str(msg)}")
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
loop.run_until_complete(main())
To test the reconnection I'm restarting mosquitto service and get the following output:
DEBUG:asyncio:Using selector: EpollSelector
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
DEBUG:gmqtt.mqtt.handler:[CMD 0x20] b'\x00\x00\x03"\x00\n'
DEBUG:gmqtt.mqtt.handler:[CONNACK] flags: 0x0, result: 0x0
DEBUG:gmqtt.client:[QoS query IS EMPTY]
DEBUG:gmqtt.mqtt.package:Sending PUBLISH (q0), 'ohlc_1m', ... (5 bytes)
DEBUG:gmqtt.mqtt.package:Sending PUBLISH (q0), 'ohlc_1m', ... (5 bytes)
DEBUG:gmqtt.mqtt.protocol:[RECV EMPTY] Connection will be reset automatically.
INFO:gmqtt.mqtt.protocol:[CONN CLOSE NORMALLY]
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:gmqtt.mqtt.package:Sending PUBLISH (q0), 'ohlc_1m', ... (5 bytes)
Traceback (most recent call last):
File "test.py", line 32, in <module>
loop.run_until_complete(main())
File "/usr/lib/python3.8/asyncio/base_events.py", line 612, in run_until_complete
return future.result()
File "test.py", line 22, in main
mqtt.publish('ohlc_1m', 'check')
File "/usr/lib/python3.8/site-packages/gmqtt/client.py", line 228, in publish
mid, package = self._connection.publish(message)
File "/usr/lib/python3.8/site-packages/gmqtt/mqtt/connection.py", line 54, in publish
return self._protocol.send_publish(message)
File "/usr/lib/python3.8/site-packages/gmqtt/mqtt/protocol.py", line 111, in send_publish
self.write_data(pkg)
File "/usr/lib/python3.8/site-packages/gmqtt/mqtt/protocol.py", line 45, in write_data
if not self._transport.is_closing():
AttributeError: 'NoneType' object has no attribute 'is_closing'
I've seen a similar error in #74 so reporting this one for further investigation.
First of all, congrats to the developers on this great project, gmqtt is a joy a to use. Thanks!
I had a quick look and it seems that Topic Alias are not supported by gmqtt. Is that the case? I will happily send a PR improving the docs if they are already supported, just couldn't see how.
We need MQTT 5.0 specification support, at least part of it. Let's make this library v5.0 compliant, here is the list of features we need in order of importance:
MQTT v5.0 CONNECT packet and custom properties per each packet
PUBLISH => set message expiry interval
Next are less important:
subscription identifier => client provides own index upon subscribe, broker specify subscription identifier that was the reason of message delivery
receive maximum => client specifies limit on quantity of inflight messages sent by broker
will delay interval
Hi,
I'm facing an issue when restoring subscriptions after a lost connection. I'm testing durability of connections and whenever I shut the connection down (turning the access point off) gmqtt reconnects and delivers messages but the subscriptions remain dead.
I see two of the following log messages right after each other:
INFO:mqtt.MQTTClient:MQTT connected
INFO:gmqtt.mqtt.protocol:[CONN CLOSE NORMALLY]
WARNING:gmqtt.mqtt.handler:[EXC OCCURED] in reconnect future None
A little later:
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.handler:[EXC OCCURED] in reconnect future None
Any idea on how to either verify subscriptions and resubscribe in case a subscription is lost or how to make subscriptions more robust?
how to setup the connection string? thanks~
The CI is not running on PRs. It seems that the credentials are not valid which are used by travis.
if raise_exc and self._error:
> raise self._error
E gmqtt.mqtt.handler.MQTTConnectError: code 134 (Connection Refused: Bad User Name or Password)
Елена, извините, опишу на русском, ибо писать много, а мой английский совсем не вери-велл :(
Пришлось протестировать на богопротивной Windows и сразу столкнулся с проблемой.
Вкратце: обычное поведение клиента состоит в том, что при неудачном подключении к брокеру, выполняются попытки переподключения.
Я категорически несогласен с этим поведением и не совсем понимаю, как это поведение обойти.
Пытаюсь обойти следующим образом:
try:
await self._client.connect(self._host, self._port, self._ssl, version=MQTTv311)
except:
self._client.stop_reconnect()
await self._client.disconnect()
И это даже работает - при неудачной попытке первичного подключения, клиент бросает дальнейшие попытки.
Но не на Windows...
В начале теста я забыл изменить значение параметра ssl
метода connect
- брокер требует безопасное подключение, но я передал значение False
. В итоге наблюдаю следующую картину:
[CONNECTION MADE]
[EXC: CONN LOST]
================================================================================
Traceback (most recent call last):
File "...Python\lib\asyncio\selector_events.py", line 804, in _read_ready__data_received
data = self._sock.recv(self.max_size)
ConnectionResetError: [WinError 10054] Удаленный хост принудительно разорвал существующее подключение
================================================================================
[CMD 0xe0] b''
[TRYING WRITE TO CLOSED SOCKET]
И так по кругу. Приведенный выше код try...except
, который призван остановить попытки после первой же неудачной, в данном случае не срабатывает.
Посоветуйте, что можно сделать?
Вопрос, на самом деле, даже более широкий: каким образом я могу не дожидаться подключения к брокеру, если уже после первой попытки понятно, что продолжать бесполезно - ошибочные host и порт, некорректные логин и пароль и т.п.?
В то же время, если возникла необходимость переподключения в процессе работы (например, на какое-то время пропало соединение), то продолжать попытки нужно практически до победы.
Hi there,
Thanks for sharing this project, I'm very excited to see MQTT and asyncio coming together.
I was surprised by the callback-style API in the basic example. I expected the example to use the async-with pattern like the aiohttp or websockets clients. My recreation:
async with gmqtt.connect('mqtt.flespi.io') as client:
print('Connected')
await client.publish('TEST/TIME', str(time.time()), qos=1)
async for message in client.subscribe('TEST/#', qos=0):
print('RECV MSG:', message)
break # print one message only
Do you have any ideas how to implement this? (I'll be looking into it myself)
Hi,
I'm new to mqtt. After cloning the repo I managed to make the "Getting started" code work. My question is, how can I connect to protected brokers with a simple username and password? I have no idea what to edit and where.
Thanks!
I just installed gmqtt but uvloop does not support Windows at the moment.
Can I use gmqtt without uvloop?
In the example, there is this comment: 'gmqtt also compatibility with uvloop'
What does it mean? Is it a dependency or a choice?
best regards
Hi
on pypi I noticed that the latest version is 0.6.2 can you please tag it also here and on the relase page?
Thanks
Does anybody know what is this error!
/home/mohsen/PycharmProjects/gmqtt/venv/bin/python /home/mohsen/PycharmProjects/gmqtt/gmqtt.py
Traceback (most recent call last):
File "/home/mohsen/PycharmProjects/gmqtt/gmqtt.py", line 59, in
loop.run_until_complete(main(host, token))
File "uvloop/loop.pyx", line 1451, in uvloop.loop.Loop.run_until_complete
File "/home/mohsen/PycharmProjects/gmqtt/gmqtt.py", line 41, in main
client.set_auth_credentials(token, None)
File "/home/mohsen/PycharmProjects/gmqtt/gmqtt/client.py", line 116, in set_auth_credentials
self._username = username.encode()
AttributeError: 'NoneType' object has no attribute 'encode'
Task was destroyed but it is pending!
task: <Task pending coro=<Client._resend_qos_messages() running at /home/mohsen/PycharmProjects/gmqtt/gmqtt/client.py:83> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f41ed8fe918>()]>>
There is a wrong line in the publish() method of the client:
Line 195 in c167583
which causes failures in my code:
2019-06-06 10:26:48,687 [ERROR] connectors.printer_input: Traceback (most recent call last):
...
File "/home/rkrell/work/project/site-packages/syncasync.py", line 120, in thread_handler
return self.func(*args, **kwargs)
File "/home/rkrell/work/project/site-packages/gmqtt/client.py", line 195, in publish
loop = asyncio.get_event_loop()
File "/home/rkrell/work/project/pypy3.5-7.0.0-linux_x86_64-portable/lib-python/3/asyncio/events.py", line 671, in get_event_loop
return get_event_loop_policy().get_event_loop()
File "/home/rkrell/work/project/pypy3.5-7.0.0-linux_x86_64-portable/lib-python/3/asyncio/events.py", line 583, in get_event_loop
% threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'Thread-1'.
Since this line has no effect it should be removed from this synchronous function.
I've not been able to track this down to anything concrete so I'm hoping someone with more experience of gmqtt can fill in the blanks.
My test environment is gmqtt with mosquitto 1.6.10
In circumstamces I haven't yet been able to reproduce in simple test code, the v5 connection appears to work fine, but a subsequent publish may fail, with Mosquitto logging "Unsupported property type: 44", causing Mosquitto to drop the connection. Other times an identical publish will succeed, and when I starting hacking the gmqtt to find out where the difference was, it was in package.py/PublishPacket where the failed publish had protocol.proto_ver == MQTTv50, where the successful publish had somewhere silently downgraded to MQTTv311.
I wasn't able to work out where the downgrade occurred; with logger set to DEBUG I didn't see anything to indicate gmqtt had failed to connect at v5 and reconnected at v3.1.1, and in any case the publish was being triggered indirectly by the on_connect event so any reconnect would have happened ahead of that anyway.
To the best I can tell, the publish worked when it was in an event handler such as on_connect, but failed when it was in a separate async task (running via asyncio.create_task which is added to the event loop after the connection succeeds).
For my purposes, now that I have got this far I can get my application working by setting version to MQTTv311 on connect (I don't need v5 functionality at this point). But if anyone can shed some light on where this downgrade is happening and why I'd be interested.
Hey,
From the readme I guessed that gmqtt should reconnect using protocol v3 if mqtt v5 is not supported on the broker side. I'm using rabbitmq which does not support v5. gmqtt got disconnected right after establishing the connected but never reconnected. When explicitly setting v3 it works smoothly. I don't think that's a bug but maybe the Readme should be modified to point out that automatic protocol version adaption is not always working.
Best regards
Is there any plan to switch to asynchronous handlers?
I'd like to use an asynchronous function to store messages to a database using asyncio:
async def on_message(client, topic, payload, qos, properties):
pass
mqtt_client.on_message = on_message
Indeed it is still possible to add a new task to the event loop; I just thought it would be nicer.
@Lenka42, we have a problem :(
/usr/local/lib/python3.7/site-packages/gmqtt/mqtt/handler.py:353: RuntimeWarning: coroutine 'blablabla._on_message' was never awaited
self.on_message(self, print_topic, packet, 1, properties)
TLS option is not working as expected. I can only set ssl to True but I cannot provide a path to the cert file.
Mosquitto configuration is working ok with TLS both publisher and subscriber:
mosquitto_sub --cafile /etc/mosquitto/ca_certificates/ca.crt -h 37c16a79d00a -t 'test' -p 8883 -u report -P 'report'
mosquitto_pub --cafile /etc/mosquitto/ca_certificates/ca.crt -h 37c16a79d00a -t 'test' -m 'amessage' -p 8883 -u report -P 'report'
mosquitto -v -c /etc/mosquitto/mosquitto.conf
1575295867: New client connected from 172.17.0.3 as mosq-W7nvl4LtsfAVItCtHT (p2, c1, k60, u'report').
1575295867: Client mosq-W7nvl4LtsfAVItCtHT disconnected.
1575295870: New connection from 172.17.0.3 on port 8883.
If I try to apply same configuration for gmqtt I get the error on the title
# EXAMPLE
import asyncio
from gmqtt import Client
async def main():
cli = Client(client_id='test',
will_message=None,
clean_session=True)
cli.set_auth_credentials('report', password='report')
await cli.connect(host='37c16a79d00a',
port=8883,
keepalive=True,
ssl=True)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
gmqtt/mqtt/connection.py
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain (_ssl.c:1076)
I would like to know how to implement TLS over gmqtt. Could you provide a quick example?
Thanks
Hi,
I've tried the getting started example.
Unfortunately it doesn't seem to work due to a garbage collection issue using python 3.8. It works fine on python 3.7 (3.7.5 to be exact).
Here is the traceback:
An open stream was garbage collected prior to establishing network connection; call "stream.close()" explicitly.
Exception in callback _SelectorSocketTransport._call_connection_lost(None)
handle: <Handle _SelectorSocketTransport._call_connection_lost(None)>
Traceback (most recent call last):
File "/usr/local/lib/python3.8/asyncio/events.py", line 81, in _run
self._context.run(self._callback, *self._args)
File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 958, in _call_connection_lost
super()._call_connection_lost(exc)
File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 716, in _call_connection_lost
self._protocol.connection_lost(exc)
File "/usr/local/lib/python3.8/site-packages/gmqtt/mqtt/protocol.py", line 194, in connection_lost
self._connection.put_package((MQTTCommands.DISCONNECT, b''))
AttributeError: 'NoneType' object has no attribute 'put_package'
Traceback (most recent call last):
File "./gmqtt_test.py", line 62, in <module>
loop.run_until_complete(main(host))
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
return future.result()
File "./gmqtt_test.py", line 45, in main
await client.connect(broker_host)
File "/usr/local/lib/python3.8/site-packages/gmqtt/client.py", line 146, in connect
await self._connection.auth(self._client_id, self._username, self._password, will_message=self._will_message,
File "/usr/local/lib/python3.8/site-packages/gmqtt/mqtt/connection.py", line 50, in auth
await self._protocol.send_auth_package(client_id, username, password, self._clean_session,
File "/usr/local/lib/python3.8/site-packages/gmqtt/mqtt/protocol.py", line 87, in send_auth_package
self.write_data(pkg)
File "/usr/local/lib/python3.8/site-packages/gmqtt/mqtt/protocol.py", line 42, in write_data
if not self._transport.is_closing():
AttributeError: 'NoneType' object has no attribute 'is_closing'
You can reproduce this by using the python 3.8-buster docker image:
host> docker run -it -v $SCRIPT_PATH:/gmqtt_getting_started.py python:3.8-buster /bin/bash
container> pip install gmqtt
container> /gmqtt_getting_started.py
Does anyone have an idea on how to fix this?
Thanks
I have two mqtt clients.
When the QoS of publish and subscribe is set to 2 at the same time, I get the following error.
[ERROR HANDLE PKG]
Traceback (most recent call last):
File "/Users/jing/virtualenv/mqtt_test/lib/python3.8/site-packages/gmqtt/mqtt/handler.py", line 351, in __call__
result = self._handle_packet(cmd, packet)
File "/Users/jing/virtualenv/mqtt_test/lib/python3.8/site-packages/gmqtt/mqtt/handler.py", line 204, in _handle_packet
handler(cmd, packet)
File "/Users/jing/virtualenv/mqtt_test/lib/python3.8/site-packages/gmqtt/mqtt/handler.py", line 403, in _handle_pubrel_packet
mid, = struct.unpack("!H", packet)
struct.error: unpack requires a buffer of 2 bytes
Why does this error occur?
I tried to use multithread for the code so that we can have multiple clients connect but its showing
RuntimeError: this event loop is already running.Task exception was never retrieved
This is my updated code for the sample code you have given
async def main(broker_host,client_id):
client = MQTTClient(client_id)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.on_subscribe = on_subscribe
print(client_id+str(1))
await client.connect(broker_host)
client.publish('TEST/TIME', time.ctime(time.time())+str(client_id), qos=1)
print(client_id+str(2))
await **STOP.wait()**
print(client_id+str(3))
await client.disconnect()
print(client_id+str(4))
def gg(host,clients):
loop.run_until_complete(main(host,clients))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
host = '172.16.55.130'
try:
y=Thread(target=gg, args = (host,"client1",))
x=Thread(target=gg, args = (host,"client2",))
except:
print("no party")
x.start()
y.start()
Could it be possible to make the extensive logging optional?
A disconnect does not clear the event.
The actual state is reflected by
connection_state = self._connected.is_set() & ~self._connection.is_closing()
My client disconnected from broker, but an error occurred while it try to reconnect . The client can't reconnect again but the event_loop still run. Maybe there should be an on_error()
callback function set on the client.
I check the source code in `gmqtt.mqtt.handler'
def _handle_disconnect_packet(self, cmd, packet):
# reset server topics on disconnect
self._clear_topics_aliases()
future = asyncio.ensure_future(self.reconnect(delay=True))
future.add_done_callback(self._handle_exception_in_future)
self.on_disconnect(self, packet)
def _handle_exception_in_future(self, future):
if future.exception():
logger.warning('[EXC OCCURED] in reconnect future %s', future.exception())
return
If the reconnect future
run fail, and the callback _handle_exception_in_future
Just print a log info ......, Why not do a reconnection after some seconds?
About the on_disconnect()
callback (In ‘README.md’)
def on_disconnect(client, packet, exc=None):
print('Disconnected')
I define the exe
param, but the source code in gmqtt.mqtt.handler
def _handle_disconnect_packet(self, cmd, packet):
# reset server topics on disconnect
self._clear_topics_aliases()
future = asyncio.ensure_future(self.reconnect(delay=True))
future.add_done_callback(self._handle_exception_in_future)
self.on_disconnect(self, packet)
In Fact , self.on_disconnect(self, packet)
did not pass an exe param, I want get more exception infomation in the callback() , andI really need it !!!
Sample:
import asyncio
from gmqtt import Client, Message
from gmqtt.mqtt.constants import MQTTv311
import logging
STOP = asyncio.Event()
def on_connect(client, flags, rc, properties):
print('CONNECTED')
client.subscribe('test/#', qos=0)
def on_message(client, topic, payload, qos, properties):
print('RECV MSG:', payload)
def on_disconnect(client, packet, exc=None):
print('DISCONNECTED')
def on_subscribe(client, mid, qos):
print('SUBSCRIBED')
async def main():
client = Client("client-id", will_message=Message('test', "LOST", qos=1, retain=True))
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.on_subscribe = on_subscribe
client.set_auth_credentials('username', 'password')
await client.connect('127.0.0.1', 8883, keepalive=60, version=MQTTv311)
client.publish('test', 'ONLINE', qos=1, retain=True)
client.publish('test', 'OFFLINE', qos=1, retain=True)
await client.disconnect()
if __name__ == '__main__':
asyncio.run(main())
Broker log:
test ONLINE
test OFFLINE
test LOST
Why LWT was sent?
Based on #113 I suggest to have the ability to not acknowledge a message at all.
So it should be possible to "return None" in the on_message callback (without raising an exception).
Why? If I want to process a message, but some backend system that I need for processing is unavailable right now, I'd either have to store the message locally somewhere (and process it later) or let the broker know that I have not processed it at all.
Another partial solution would be to disconnect from the broker as long as the backend is unavailable, but still there is a time window where some messages might slip through.
Please add a way to await
for all pending messages to be delivered.
Calling await client.disconnect()
is throwing a bunch of errors:
Task was destroyed but it is pending!
task: <Task cancelling name='Task-3' coro=<MQTTProtocol._read_loop() running at /usr/lib/python3.8/site-packages/gmqtt/mqtt/protocol.py:185> wait_for=<Future finished result=None>>
Task was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<Client.reconnect() running at /usr/lib/python3.8/site-packages/gmqtt/client.py:191> cb=[MqttPackageHandler._handle_exception_in_future()]>
/usr/lib/python3.8/asyncio/base_events.py:637: RuntimeWarning: coroutine 'Client.reconnect' was never awaited
self._ready.clear()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Hello!
I'm using your clean example from README and I have two problems:
Connected
[TRYING WRITE TO CLOSED SOCKET]
Disconnected
Connected
Disconnected
[TRYING WRITE TO CLOSED SOCKET]
Connected
[TRYING WRITE TO CLOSED SOCKET]
Disconnected
Connected
[TRYING WRITE TO CLOSED SOCKET]
Disconnected
Connected
[TRYING WRITE TO CLOSED SOCKET]
Disconnected
Connected
Disconnected
mosquitto: 1.4.15
gmqtt: 0.6.5
With mosquitto 1.6.9 all works fine.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.