sbtinstruments / aiomqtt Goto Github PK
View Code? Open in Web Editor NEWThe idiomatic asyncio MQTT client, wrapped around paho-mqtt
Home Page: https://sbtinstruments.github.io/aiomqtt
License: BSD 3-Clause "New" or "Revised" License
The idiomatic asyncio MQTT client, wrapped around paho-mqtt
Home Page: https://sbtinstruments.github.io/aiomqtt
License: BSD 3-Clause "New" or "Revised" License
in types.py:
ProtocolType = Union[paho.MQTTv31, paho.MQTTv311, paho.MQTTv5]
gives
TypeError: Union[arg, ...]: each arg must be a type. Got 3.
Reproduces at least on python 3.6, 3.7 & 3.8.
Thank you for asyncio-mqtt; I've used it many of my home IoT applications.
I noticed one (pedantic) thing the the advance example that you might consider changing...
The function cancel_task contains the code:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
Unfortunately, it is possible that the CancelledError exception arrives between task.cancel
call and the try block. It is better practice to do:
try:
task.cancel()
await task
except asyncio.CancelledError:
pass
Hi, this some somewhat related to #78
I am trying to find a way to use the client globally in my web application. The app runs fine, and the client seems to close cleanly. However, when I run my pytests I get an error after the first test. The client seems to remember the previous event loop between tests, even though I recreate the app for each test.
RuntimeError: Event loop is closed
I think the problem is that the client is imported from another module. But I don't see another way of doing it, since I need to have the client available in several files. I cannot create it in the main module since this would result in circular dependencies.
Below is the main file of my example project. The full project can be viewed here: https://github.com/nbraun-wolf/async-mqtt-fastapi-pytest
from asyncio import CancelledError, create_task
from fastapi import FastAPI
def create_app():
app = FastAPI()
# need to in a separate file in order
# to avoid circular dependencies
from client import mqtt
from router import router
@app.on_event("startup")
async def connect():
await mqtt.connect()
async def subscribe(client):
async with client.filtered_messages("test") as messages:
await client.subscribe("test")
async for message in messages:
print(message.payload.decode())
global subscribe_task
subscribe_task = create_task(subscribe(mqtt))
@app.on_event("shutdown")
async def disconnect():
if not subscribe_task.done():
subscribe_task.cancel()
try:
await subscribe_task
except CancelledError:
pass
await mqtt.disconnect()
# this router for example is using the client
app.include_router(router)
return app
This doesn't happen when using for each publish a new client as context manager, as well as a separate client for the subscription. But it breaks when trying to use the same client ID for the subscriber and the publisher clients, since it disconnects the subscriber after publishing a message when the context manager of the publishing client closes.
Conceptionally I have one instance of app, so it should have only a single client ID and get treated by the broker as one entity, which doesn't seem to be possible. The behaviour can be tested in this branch: https://github.com/nbraun-wolf/async-mqtt-fastapi-pytest/tree/multi-client
Apart from the disconnection problem, I feel like It's wasteful to create constantly new clients and making a new connection first before being able to publish a message.
What is the intended way to use the client in such a scenario?
Hey, this library rocks, thanks for writing it!
This is an upstream problem but, might be worth documenting the workaround somewhere, I am running this on Python 3.8.2 on Windows.
The API that asncio-mqtt uses calls asyncio.get_event_loop().add_reader()
On windows, the default event_loop doesn't support add_reader()
and throws an NotYetImplemented exception.
There's some other discussion in other projects like tornadoweb/tornado#2608 (comment) about adding this as a workaround:
if sys.version_info[0] == 3 and sys.version_info[1] >= 8 and sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
It might be worth just having that code snippet documented somewhere for windows users of asyncio-mqtt
Hi, new to the project, and loving where it's going.
I thought about opening a PR to start... but maybe some context first might make more sense. When debugging my app, I ran into a situation where a timeout would cause paho to stop sending all messages. I was catching and logging the errors, but missed the subtle difference between the MqttError
and MqttCodeError
types behind the scenes.
For reasons still somewhat unknown, the reason code was being set to an integer (not a mqtt.ReasonCode
) and the subtle [Code: 4]
added to the __str__()
had slipped by. Once I took a deeper look at what was happening, I saw the extra information, as well as a helper method in paho's source code: https://github.com/eclipse/paho.mqtt.python/blob/master/src/paho/mqtt/client.py#L183
The simple complaint here, is that [Code: {}]
is a small addition to an error message, easy to miss, and a not-very-idiomatic error number, when it could instead be an english error message in instances where the rc
value is actually an integer.
using Paho's error_message
handler or using their builtin methods, we could change the __str__()
method in MqttCodeError
to look something a little like this:
class MqttCodeError(MqttError):
def __init__(self, rc: Union[int, mqtt.ReasonCodes], *args: Any):
super().__init__(*args)
self.rc = rc
def __str__(self) -> str:
if isinstance(self.rc, mqtt.ReasonCodes):
return f"[code:{self.rc.value}] {str(self.rc)}"
else:
return f"[code:{self.rc}] {mqtt.error_string(self.rc)} {super().__str__()}"
But for such a simple change... I figured it'd be better to ask if there was a reason it was setup this way, or if I'm touching something that hasn't been considered in a while. Maybe my versions are worth considering? paho-mqtt-1.5.1
and asyncio_mqtt-0.9.1
If this sounds like a useful change, I'd be more than willing to open up a simple PR... otherwise I'd be interested in learning why the integer codes are preferred to Paho's internal error messages.
I'm using the example code, which worked a few weeks ago. Now, it's sending a DISCONNECT right after a SUBACK. I don't know if asyncio-mqtt or paho.mqtt.python changed. Has anyone seen behaviour like
DEBUG:mqtt:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'2c063326-vpn'
DEBUG:mqtt:Received CONNACK (0, 0)
DEBUG:mqtt:Sending SUBSCRIBE (d0, m1) [(b'prefix/+', 0)]
DEBUG:mqtt:Received SUBACK
DEBUG:mqtt:Sending DISCONNECT
with code like
async with Client(...) as client:
async with client.filtered_messages(f'{request_prefix}/+') as messages:
await client.subscribe(f'{request_prefix}/+')
async for message in messages:
pass
Currently, it never makes it into the innermost loop.
The publish()
method forces the user to wait for the message to be published. It is perfectly reasonable to publish a message with QOS > 0 when the client is disconnected. For example, on unreliable links, a loop in a thread/task is used to reconnect when the connection goes down. The publisher can continue on publishing at QOS > 0 without knowing anything about the status of the connection. This is not possible with the current publish()
implementation and puts a bigger implementation burden on the user of this library.
In the examples, I am trying to use the code provided for the subscriber:
import json
from asyncio_mqtt import Client, ProtocolVersion
async with Client(
"test.mosquitto.org",
username="username",
password="password",
protocol=ProtocolVersion.V31
) as client:
async with client.filtered_messages("floors/+/humidity") as messages:
# subscribe is done afterwards so that we just start receiving messages
# from this point on
await client.subscribe("floors/#")
async for message in messages:
print(message.topic)
print(json.loads(message.payload))
Am I missing anything on the end, like a main==main
?
Also when I run the code I get an error:
File "mqtt_subscriber.py", line 6
async with Client("test.mosquitto.org") as client:
^
SyntaxError: 'async with' outside async function
Thanks for any tips getting me up and running, not alot of wisdom here.
async with Client("10.10.0.1") as client:
...
blocks the event loop for 30 seconds. The source code appears to call the paho client connect() directly.
I'd like to bring the following issue to your attention:
eclipse/paho.mqtt.python#563
I added a comment explaining the issue and how asyncio-mqtt is involved.
As far as I know, the root cause is in paho-mqtt, but is exacerbated by how asyncio-mqtt uses paho-mqtt.
I'm not sure there's much you can do about it in asyncio-mqtt, except for calling loop()
instead.
However, I believe that is not a full solution since the sockpair will still get out of sync.
Most likely when paho-mqtt is fixed, you'll want to bump the required version.
Is there a way to handle lost connections to the broker in asyncio-mqtt? It seems like paho-mqtt implements automatic reconnection in its loop_forever
function, for example, but I wasn't able to figure out how can I do a similar thing in asyncio-mqtt.
As a hacky solution, I tried awaiting the _disconnected
future, and re-creating the Client instance, but this didn't work either, as the re-created client just hangs if the broker isn't available.
Bellow is the example code of trying to re-connect by creating a new client. To force a connection error, I stop and then restart my local mqtt broker.
import asyncio
from contextlib import AsyncExitStack
from asyncio_mqtt import MqttError, Client
def get_stack_and_client():
return AsyncExitStack(), Client("127.0.0.1")
async def reconnect_example():
stack, client = get_stack_and_client()
await stack.enter_async_context(client)
print("connected")
try:
await client._disconnected # killing broker here
except MqttError as exc:
print(f"disconnected {exc}") # I get a "[code:1] Could not disconnect" error here
try:
await stack.aclose()
except Exception as exc:
print(f"closing 1st stack: {exc}") # "[code:1] Could not disconnect" again
for attempt in range(3):
print(f"reconnecting attempt #{attempt}")
stack, client = get_stack_and_client()
await stack.enter_async_context(client) # the code just hangs on this line, without even raising ConnectinRefused
print("connected")
await asyncio.sleep(10)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(reconnect_example())
loop.close()
The way you print the error as a string and return it makes it hard to parse. The example code tries reconnecting endlessly, even if the server is configured with bad port or host name.
Making the error parseable would be a huge improvement and would make it easy to improve the example code so it does not reconnect on any error, but on disconnects.
This library does not work with the following (pseudocode):
client = Client(..., client_id='1234', clean_session=False)
client.connect()
... do stuff ...
client.disconnect() // or we got disconnected for any reason
client.connect()
Any iteration of subscribed topics will fail after the disconnect because the disconnected
future in the client remains set. Because of that, the client cannot be reused after a disconnect. The client is where QOS > 0 is implemented (it has the queue of published messages) so that means this library cannot support publishing with QOS > 0.
It seems that https://github.com/sbtinstruments/asyncio-mqtt/blob/master/asyncio_mqtt/client.py#L387 may yield an error as rc
is paho.mqtt.reasoncodes.ReasonCodes
and not an integer?
asyncio-mqtt==0.9.0
paho-mqtt==1.5.1
Hi,
I just started using your lib and after struggling a bit I managed to do what I wanted. I have a topic that I subscribed to and I published some json data and printed it back in the subscriber:
Subscriber code:
async with Client("www.maqiatto.com", username="username", password="password", protocol=ProtocolVersion.V31) as client:
async with client.filtered_messages("dummy/contactor") as messages:
await client.subscribe("dummy/#")
async for message in messages:
print(message.topic)
print(json.loads(message.payload))
Publisher code:
async with Client("www.maqiatto.com", username="username", password="password", protocol=ProtocolVersion.V31) as client:
message = {"state": 3}
await client.publish("dummy/contactor", payload=json.dumps(message), qos=2, retain=False)
This works and I get the following printed out in the terminal:
dummy/contactor
{'state': 3}
So, looks good ;)
But I ran into some issues before I could make it work:
from asyncio_mqtt.client import ProtocolVersion
I guess it would be better to add the ProtocolVersion to the init.py
async for message in messages:
print(json.loads(message))
But this wouldnt work, because message is of MQTT data type. This was not clear for me and I had to search around to understand that the message received can be stripped into two properties: "topic" and "payload" and by decoding the payload it worked. Maybe is due to my inexperience that I didnt know that, but it would be cool if somewhere this is explained or showed in an example.
async with client.filtered_messages("dummy/contactor") as messages:
await client.subscribe("dummy/contactor")
async for message in messages:
print(message.topic)
print(json.loads(message.payload))
And I sent a message, using the following code:
async with Client("www.maqiatto.com", username="username", password="password", protocol=ProtocolVersion.V31) as client:
message = {"state": 3}
await client.publish("dummy/test", payload=json.dumps(message), qos=2, retain=False)
But, I still got the message in the subscriber:
dummy/test
{'state': 3}
So what am I doing wrong here?
Thank you for your work and let me know what you think about this.
Best regards,
Andrรฉ
After some days of running I am receiving the "Broken pipe" error. This is currently not caught by asyncio-mqtt. Tbh this is too low level for me to understand. I expect it has something to do with an overflow of messages. It occurs at the same moment, I am not sure which is the cause and which is the effect, that the cpu, mem and swap are suddenly going crazy.
Trace:
evss_device_center.1.zaqg6zdkswua@evss24 | BrokenPipeError: [Errno 32] Broken pipe
evss_device_center.1.zaqg6zdkswua@evss24 | Exception in callback Client._on_socket_open.<locals>.cb() at /usr/local/lib/python3.8/site-packages/asyncio_mqtt/client.py:291
evss_device_center.1.zaqg6zdkswua@evss24 | handle: <Handle Client._on_socket_open.<locals>.cb() at /usr/local/lib/python3.8/site-packages/asyncio_mqtt/client.py:291>
evss_device_center.1.zaqg6zdkswua@evss24 | Traceback (most recent call last):
evss_device_center.1.zaqg6zdkswua@evss24 | File "/usr/local/lib/python3.8/asyncio/events.py", line 81, in _run
evss_device_center.1.zaqg6zdkswua@evss24 | self._context.run(self._callback, *self._args)
evss_device_center.1.zaqg6zdkswua@evss24 | File "/usr/local/lib/python3.8/site-packages/asyncio_mqtt/client.py", line 292, in cb
evss_device_center.1.zaqg6zdkswua@evss24 | client.loop_read()
evss_device_center.1.zaqg6zdkswua@evss24 | File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 1572, in loop_read
evss_device_center.1.zaqg6zdkswua@evss24 | rc = self._packet_read()
evss_device_center.1.zaqg6zdkswua@evss24 | File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 2310, in _packet_read
evss_device_center.1.zaqg6zdkswua@evss24 | rc = self._packet_handle()
evss_device_center.1.zaqg6zdkswua@evss24 | File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 2936, in _packet_handle
evss_device_center.1.zaqg6zdkswua@evss24 | return self._handle_publish()
evss_device_center.1.zaqg6zdkswua@evss24 | File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 3219, in _handle_publish
evss_device_center.1.zaqg6zdkswua@evss24 | rc = self._send_puback(message.mid)
evss_device_center.1.zaqg6zdkswua@evss24 | File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 2471, in _send_puback
evss_device_center.1.zaqg6zdkswua@evss24 | return self._send_command_with_mid(PUBACK, mid, False)
evss_device_center.1.zaqg6zdkswua@evss24 | File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 2580, in _send_command_with_mid
evss_device_center.1.zaqg6zdkswua@evss24 | return self._packet_queue(command, packet, mid, 1)
evss_device_center.1.zaqg6zdkswua@evss24 | File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 2911, in _packet_queue
evss_device_center.1.zaqg6zdkswua@evss24 | self._sockpairW.send(sockpair_data)
The code hangs up, without any message, when an exception is occurred during log_messages execution.
async def log_messages(messages, template): async for message in messages: print(template.format(message.payload.decode()) # if it failed to decode, because of binary data in my case it hangs up forever
For my specific case i figured it out, but it is a dangerous.
The paho client allows you to set the keepalive time. The asyncio-mqtt client does not. That is a pretty important parameter for controlling how much automatic communication happens with the mqtt server, and also for controlling when the server sends out the LWT.
REQUEST FOR ENHANCEMENT
if i am right, the followin function is presently unimplemented in asyncio-mqtt :
https://github.com/eclipse/paho.mqtt.python/blob/225ab3757f6818ba85eb80564948d1c787190cba/src/paho/mqtt/client.py#L875
( honestly, i wont pull because i think code-change is @maintainer ;-) )
asyncio-mqtt SHOULD make provisions to utilize paho.mqtt.client.proxy_set(**proxy_args)
there are no issues when using transport="websockets", since the websocket tcp-connection is proxy-unaware (hope i understand this right ;-), but e.g. connection via dynamic ssh-tunnels - by nature - need explicit support for connect via proxy.
thank your for enabling this "new" feature
w.
Hi,
I am writing a Python based video system for university lectures - driven by the urgent need. Being rather new to Python and asyncio I may be above my league here... Sticking with your (bigger) example, I have successfully implemented an MQTT client within my application, it runs as a coroutine.
Question: is there a way I can change the topic of the messages in the filtered message logger after the task has been created. If not, should I cancel this task and recreate it to do that?
Thanks
Peter
So I have been butchering down the advanced example
to my level of understanding.
A couple questions for the MQTT newbie.
What does the def log_messages
function do? For whatever reason the debug statements I inserted, this method/function is never hit.
Is this also legit code for the publish part to filter for a string comparison like this below?
async def post_to_topics(client, topics):
while True:
for topic in topics:
if topic == 'zone_setpoints':
Any tips greatly appreciated. I am still a little bit confused on the basics, like what device defines the topics. For example if lots of MQTT client devices running with one broker, can each client device have unique topics and then maybe a few overlapping ones to share data between the client devices?
import asyncio, json
from contextlib import AsyncExitStack, asynccontextmanager
from random import randrange
from asyncio_mqtt import Client, MqttError
async def advanced_example():
# context managers create a stack to help manage them
async with AsyncExitStack() as stack:
# Keep track of the asyncio tasks that we create, so that
# we can cancel them on exit
tasks = set()
stack.push_async_callback(cancel_tasks, tasks)
# Connect to the MQTT broker
client = Client("test.mosquitto.org")
await stack.enter_async_context(client)
# topic filters
topic_filters = (
"electric_meter",
"zone_temps",
"zone_setpoints",
"reheat_valves"
)
for topic_filter in topic_filters:
# Log all messages that matches the filter
manager = client.filtered_messages(topic_filter)
messages = await stack.enter_async_context(manager)
template = f'[topic_filter="{topic_filter}"] {{}}'
task = asyncio.create_task(log_messages(messages, template))
tasks.add(task)
# Messages that doesn't match a filter will get logged here
messages = await stack.enter_async_context(client.unfiltered_messages())
print("INCOMING, NO TEMPLATE FOUND FOR THIS ONE")
task = asyncio.create_task(log_messages(messages, "[unfiltered] {}"))
tasks.add(task)
# Subscribe to topic(s)
# subscribe *after* starting the message
await client.subscribe("reheat_valves")
# Publish a random value to each of these topics
topics = (
"electric_meter",
"zone_temps",
"zone_setpoints",
"reheat_valves"
)
task = asyncio.create_task(post_to_topics(client, topics))
tasks.add(task)
# Wait for everything to complete (or fail due to, e.g., network
# errors)
await asyncio.gather(*tasks)
async def post_to_topics(client, topics):
while True:
for topic in topics:
if topic == 'zone_setpoints':
print("LETS gather some data, future function ToDo...")
message = {
"status": "read_success",
"present_value": randrange(100)
}
print(f'[topic="{topic}"] ZONE Setpoints Publishing message={message}')
await client.publish(topic, json.dumps(message), qos=1)
else:
print(f'PASSING TOPIC on {topic}')
await asyncio.sleep(30)
async def log_messages(messages, template):
async for message in messages:
# ๐ค Note that we assume that the message paylod is an
# UTF8-encoded string (hence the `bytes.decode` call).
print("FUNCTION HIT def log_messages!")
print("TEMPLATES TO DECODE")
print(template.format(message.payload.decode()))
async def cancel_tasks(tasks):
for task in tasks:
if task.done():
continue
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def main():
# Run the advanced_example indefinitely. Reconnect automatically
# if the connection is lost.
reconnect_interval = 10 # [seconds]
while True:
try:
await advanced_example()
except MqttError as error:
print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
finally:
await asyncio.sleep(reconnect_interval)
asyncio.run(main())
Hi, sometime I get this exceptions:
What can be the reason of them?
Hi,
I have this issue:
ERROR 2020-10-06 08:59:45,391 client 17392 19700 failed to receive on socket: [SSL] malloc failure (_ssl.c:2607)
ERROR 2020-10-06 08:59:45,395 fw_mqtt_client 17392 19700 Broker: <broker> not connected:Disconnected during message iteration
Any idea?
Thanks
I received a lot of disconnections from few servers with same error:
here is a extra data:
To ease the use of certificates for authentication, it would be really nice if the tls_set function was implemented in asyncio-mqtt client.py.
This would be a nice feature for newbies (like me)
There are two cases during connection that will result in reconnecting from the event loop, blocking the loop in the process.
This library, asyncio-mqtt, wraps paho-mqtt and uses its external loop feature to drive paho-mqtt using the asyncio event loop. The asyncio event loop should never block so it's important to not run blocking I/O inside the event loop.
We hook into the event loop by using the loop.add_reader
method and connect the paho-mqtt callback loop_read
. This callback handles the incoming packets from the socket, by calling _packet_read
and in turn _packet_handle
. The latter directs to different callbacks depending on the command. One of those cases is the _handle_connack
callback. There are two cases in this callback that will result in calling the reconnect
method of the paho mqtt client. This method does blocking I/O. So if we would end up in either of those cases, we would block the event loop during the reconnection.
Case 1:
protocol == MQTTv311 and result == CONNACK_REFUSED_PROTOCOL_VERSION
Case 2:
protocol == MQTTv311 and result == CONNACK_REFUSED_IDENTIFIER_REJECTED and self._client_id == b''
2020-11-27 12:04:31,471 INFO (MainThread) [asyncio_mqtt] Starting client example.
2020-11-27 12:04:31,472 DEBUG (MainThread) [asyncio] Using selector: EpollSelector
2020-11-27 12:04:31,480 DEBUG (ThreadPoolExecutor-0_0) [asyncio_mqtt] Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
2020-11-27 12:04:31,482 DEBUG (MainThread) [asyncio_mqtt] Received CONNACK (0, 1), attempting downgrade to MQTT v3.1.
2020-11-27 12:04:31,482 DEBUG (MainThread) [asyncio_mqtt] Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
Error "Operation timed out". Reconnecting in 3 seconds.
2020-11-27 12:04:44,490 DEBUG (ThreadPoolExecutor-0_0) [asyncio_mqtt] Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
2020-11-27 12:04:44,493 DEBUG (MainThread) [asyncio_mqtt] Received CONNACK (0, 1), attempting downgrade to MQTT v3.1.
2020-11-27 12:04:44,494 DEBUG (MainThread) [asyncio_mqtt] Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
Error "Operation timed out". Reconnecting in 3 seconds.
^C2020-11-27 12:04:55,391 INFO (MainThread) [asyncio_mqtt] Exiting client example.
The first Sending CONNECT
happens in the thread pool. This is good. But after the downgrade the next Sending CONNECT
happens in the main thread, ie event loop. That's bad.
I haven't found any easy way of resolving this without changes in paho-mqtt. I've been playing with monkey patching the paho-mqtt reconnect
method, but it doesn't seem clean so I don't really like that.
The upside is that these two cases only occur during connection and may also be avoided by making sure the correct MQTT protocol is used and setting the client_id
.
I'm opening this issue, for verification and information, and discussion if there's anything we can do to mitigate this.
I'm not sure if this is a bug or the intended behavior. I have a Javascript client which uses async-mqtt. With this client I am able to write code as follows
async on_message(topic, payload) {
let result = await long_running_handler(topic, payload);
do_something(result);
}
on_message
will be called again even if long_running_handler
is still handling the previous packet.
I am not able to replicate this behavior in Python using the basic example.
async with Client("test.mosquitto.org") as client:
async with client.filtered_messages("floors/+/humidity") as messages:
await client.subscribe("floors/#")
async for message in messages:
# This blocks the for loop from processing the next message
result = await long_running_handler(topic, payload)
do_something(result)
# workaround is to start the handler with ensure_future, but this complicates the flow
# _future = asyncio.ensure_future(long_running_handler(topic, payload))
# now what?
I must resort to using asyncio.ensure_future
to start the handler so that on_message can return immediately.
Is there a way around this? Really I am not interested in using (un)filtered_message()
, I think I just need a barebones async wrapper around the paho _client.on_message
which calls asyncio.ensure_future(user_on_message_callback)
Hi
The advanced example is working well. How can I add further subscriptions at runtime?
Thanks for the work on this library, I'm using this library and want to publish a message outside of AsyncExitStack but don't know how to do it, any suggestions?
OS - WIndows 10
Python - 3.9.5
asyncio-mqtt - 0.9.1
Caught exception in on_socket_open:
Traceback (most recent call last):
File "d:\#CODE\Python\mqtt.py", line 8, in <module>
asyncio.run(main())
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 642, in run_until_complete
return future.result()
File "d:\#CODE\Python\mqtt.py", line 5, in main
async with Client("test.mosquitto.org") as client:
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\asyncio_mqtt\client.py", line 512, in __aenter__
await self.connect()
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\asyncio_mqtt\client.py", line 173, in connect
await loop.run_in_executor(
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\concurrent\futures\thread.py", line 52, in run
result = self.fn(*self.args, **self.kwargs)
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 941, in connect
return self.reconnect()
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 1117, in reconnect
self._call_socket_open()
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 2071, in _call_socket_open
self.on_socket_open(self, self._userdata, self._sock)
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\asyncio_mqtt\client.py", line 467, in _on_socket_open
self._loop.add_reader(sock.fileno(), cb)
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\asyncio\events.py", line 504, in add_reader
raise NotImplementedError
NotImplementedError
Caught exception in on_socket_close:
Exception ignored in: <function Client.__del__ at 0x0000029A16F06EE0>
Traceback (most recent call last):
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 660, in __del__
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 704, in _reset_sockets
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 698, in _sock_close
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 2105, in _call_socket_close
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\asyncio_mqtt\client.py", line 482, in _on_socket_close
File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\asyncio\events.py", line 507, in remove_reader
NotImplementedError:
hi, there's a logic reason which made me need to check some message(like server response) in a limited time window(like 10 seconds?) . i havnt seen that support in the library, is it possible to do some hack for such feature?
Calling client.subscribe([])
should fail/return immediately, but it gets passed on to the paho client, which happily subscribes to no topics. The client.subscribe
function then sets up a callback to wait for paho to fire an on_subscribe
callback, but it never does, because there were no topics subscribed to.
Recreate with:
import asyncio
from asyncio_mqtt import Client
async def main():
client = Client("test.mosquitto.org")
print("Connecting...")
await client.connect()
print("Connected!")
print("Subscribing...")
await client.subscribe()
print("Subscribed!")
print("Disconnecting...")
await client.disconnect()
print("Disconnected!")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
loop.stop()
The client.subscribe
function for reference:
async def subscribe(self, *args, timeout=10, **kwargs):
result, mid = self._client.subscribe(*args, **kwargs)
# Early out on error
if result != mqtt.MQTT_ERR_SUCCESS:
raise MqttCodeError(result, 'Could not subscribe to topic')
# Create future for when the on_subscribe callback is called
cb_result = asyncio.Future()
with self._pending_call(mid, cb_result):
# Wait for cb_result
return await self._wait_for(cb_result, timeout=timeout)
This condition did'nt work when username is set and password is None. Password can be optional. It is in paho-mqtt.
I was stuck half an hour to undertand this library needed password="" to work. Otherwise the username is not set and you get a connection refused.
I am building a health check API for my company's monitor service and considering taking message amount of a message queue as a health metric. Is adding a public method to return the message amount of a message queue a good idea?
Just as,
def _cb_and_generator(self, *, log_context, queue_maxsize=0):
# Queue to hold the incoming messages
self.messages = messages = asyncio.Queue(maxsize=queue_maxsize)
and
async with client.filtered_messages("#") as messages:
await client.subscribe("#")
async for message in messages:
print("QUEUE SIZE", client.messages.qsize())
Thanks for your great works!
Hi, what is the intended way to stop cleanly? When calling client.disconnect
, it throws an error MqttError("Disconnected during message iteration")
.
If there is no other way, how can we know if the user called disconnect on purpose? In the original paho client, we have reason code 0 for that. But I can't find a way to get this info from this wrapper here.
I am trying to run this client in a background thread, and using fastapi
as main thread. I have adjusted the example for the readme a bit. I am using this loop boolean currently, but It's not pretty.
from asyncio import CancelledError, create_task, gather, run, sleep
from contextlib import AsyncExitStack
from threading import Thread
from asyncio_mqtt import Client, MqttError
broker_host = "localhost"
topic_prefixes = ("test/#",)
topic_handlers = {}
client = None
loop = True
def topic(topic):
def wrapper(handler):
topic_handlers[topic] = handler
return handler
return wrapper
async def run_client():
global client
async with AsyncExitStack() as stack:
tasks = set()
async def cancel_tasks(tasks):
for task in tasks:
if task.done():
continue
task.cancel()
try:
await task
except CancelledError:
pass
async def handle_messages(client, messages, handler):
async for message in messages:
await handler(client, message.payload.decode())
stack.push_async_callback(cancel_tasks, tasks)
client = Client(broker_host)
await stack.enter_async_context(client)
for topic_filter, handler in topic_handlers.items():
manager = client.filtered_messages(topic_filter)
messages = await stack.enter_async_context(manager)
task = create_task(handle_messages(client, messages, handler))
tasks.add(task)
for topic_prefix in topic_prefixes:
await client.subscribe(topic_prefix)
await gather(*tasks)
async def background_client():
reconnect_interval = 3
while loop:
try:
await run_client()
except MqttError as error:
if loop:
print(f'Error "{error}"')
finally:
if loop:
await sleep(reconnect_interval)
def _paho_thread():
run(background_client())
def get_client():
if not client:
raise Exception("could not get client, did you forget to call mqtt_startup?")
return client
paho_thread = Thread(target=_paho_thread, daemon=True)
async def mqtt_startup():
paho_thread.start()
async def mqtt_shutdown():
global loop
client = get_client()
loop = False
await client.disconnect()
paho_thread.join()
from json import dumps
from typing import Optional
from fastapi import FastAPI
from mqtt import get_client, mqtt_shutdown, mqtt_startup, topic
app = FastAPI()
@app.on_event("startup")
async def startup_event():
await mqtt_startup()
@app.on_event("shutdown")
async def shutdown_event():
await mqtt_shutdown()
@app.get("/")
async def read_root():
client = get_client()
await client.publish("test/bar", dumps({"pub": "foo"}))
return {"Hello": "World"}
@topic("test/foo")
async def test(client, payload):
print("foo handler")
print(payload)
await client.publish("test/bar", dumps({"pub": "foo"}))
@topic("test/bar")
async def foo(client, payload):
print("bar handler")
print(payload)
Hi,
what is this warning means?
I am publishing with qos1
at:
/usr/local/lib/python3.7/site-packages/asyncio_mqtt/client.py
There are 13 pending publish calls.
There are 12 pending publish calls.
Hello again! I noticed one thing, when I try to transfer a file larger than 100MB, it takes a very long time when publishing, it can take 3-5 minutes. If I use TLS, then 2-3 times longer. Moreover, the subscriber receives this file instantly. Any idea why this might be the case? Could it be the small size of the TCP allocated buffer?
P.s. I`m using Ubuntu 20.04.1
Add possibility to setup client clean_session=False
I have implemented the example โAdvance useโ. However, after subscribing do not receive all retained messages with log_filtered_messages.
With both your example and MQTTBox (Chrome browser extention) I subscribe to the same broker and topic โ#โ, and expect to receive 4 retained messages. With MQTTbox I receive all 4 messages, but with your client I only receive 2 retained messages. I tried looking into your code to explore what could be the issue, but lag the experience to debug.
My code for subscribing and handling the message:
await client.subscribe('#')
asyncio.create_task(log_filtered_messages(client, '#'))
asyncio.create_task(log_unfiltered_messages(client))
What I managed to do is add debug logging and I have created the log below. In yellow highlighted the messaged that are received, but are not handled with my script.
2020-05-14 11:03:50,534 - root - DEBUG - Sending CONNECT (u1, p1, wr0, wq0, wf0, c1, k60) client_id=b''
2020-05-14 11:03:50,538 - root - DEBUG - Received CONNACK (0, 0)
2020-05-14 11:03:50,539 - root - DEBUG - Sending SUBSCRIBE (d0, m1) [(b'#', 0)]
2020-05-14 11:03:50,541 - root - DEBUG - Received SUBACK
2020-05-14 11:03:50,542 - root - DEBUG - Received PUBLISH (d0, q0, r1, m0), 'modules/...', ... (421 bytes)
2020-05-14 11:03:50,544 - root - DEBUG - Received PUBLISH (d0, q0, r1, m0), 'modules/....', ... (524 bytes)
2020-05-14 11:03:50,545 - root - DEBUG - Received PUBLISH (d0, q0, r1, m0), 'modules/...', ... (498 bytes)
[topic_filter="#"]: MESSAGE 1
2020-05-14 11:03:50,546 - root - DEBUG - Received PUBLISH (d0, q0, r1, m0), 'modules/...', ... (469 bytes)
[topic_filter="#"]: MESSAGE 2
Have you experienced this behavior before, or have an idea what is going wrong?
I have a discord bot that is supposed to send a message to discord after receiving an MQTT message. There is very little activity, so typically there is hours or even days between messages received. I noticed that when there's a long wait, the first time a message is published and is received by the bot, I hit Disconnected during message iteration
. The result is the message on the subscribed topic is gone. Here is a log of the output when this happens:
bot | 2020-12-19 05:51:55 INFO Connecting to MQTT.
bot | 2020-12-19 05:51:55 INFO Connection to MQTT open.
...
bot | 2020-12-19 16:18:19 ERROR Connection to MQTT closed: Disconnected during message iteration
bot | 2020-12-19 16:18:22 INFO Connecting to MQTT.
bot | 2020-12-19 16:18:22 INFO Connection to MQTT open.
Subsequent messages are published minutes later, and the bot is able to process the message as intended.
I've looked at the code in asyncio-mqtt
but it's unclear to me how to prevent this condition. Any advice is greatly appreciated.
For background, the bot and mosquito are running with docker-compose, and are on the same docker network. My connection and message subscription loop is identical to the code in the test.
Advanced example has:
client = Client("test.mosquitto.org")
Needs to be:
async with Client("test.mosquitto.org") as client:
...
Hi, i have error:
After few days that mqtt was connected well to broker suddenly i get an error:
File "/usr/local/lib/python3.7/site-packages/paho/mqtt/client.py", line 1572, in loop_read,
2021-01-07 08:52:16,574,base_events.py,base_events,default_exception_handler,ERROR,6,MainThread,Exception in callback Client._on_socket_open.<locals>.cb() at /usr/local/lib/python3.7/site-packages/asyncio_mqtt/client.py:291,
asyncio.base_futures.InvalidStateError: invalid state,
2021-01-07 08:52:16,574,client.py,client,_easy_log,ERROR,6,MainThread,Caught exception in on_connect: invalid state,
handle: <Handle Client._on_socket_open.<locals>.cb() at /usr/local/lib/python3.7/site-packages/asyncio_mqtt/client.py:291>,
Traceback (most recent call last):,
File "/usr/local/lib/python3.7/asyncio/events.py", line 88, in _run,
self._context.run(self._callback, *self._args),
File "/usr/local/lib/python3.7/site-packages/asyncio_mqtt/client.py", line 292, in cb,
client.loop_read(),
rc = self._packet_read(),
File "/usr/local/lib/python3.7/site-packages/paho/mqtt/client.py", line 2310, in _packet_read,
rc = self._packet_handle(),
File "/usr/local/lib/python3.7/site-packages/paho/mqtt/client.py", line 2942, in _packet_handle,
return self._handle_connack(),
File "/usr/local/lib/python3.7/site-packages/paho/mqtt/client.py", line 3030, in _handle_connack,
self, self._userdata, flags_dict, result),
File "/usr/local/lib/python3.7/site-packages/asyncio_mqtt/client.py", line 242, in _on_connect,
self._connected.set_result(rc),
future: <Future finished exception=MqttCodeError('Unexpected disconnect')>,
asyncio_mqtt.error.MqttCodeError: [code:1] Unexpected disconnect
When using unfiltered_messages
to receive all MQTT messages it would be useful if information about the message was provided instead of just the payload. For example, the topic
field of the message at minimum would be useful.
This could be achieved by passing msg
to messages.put_nowait
in _cb_and_generator
instead of msg.payload
. The downside is that this exposes an instance of a class from mqtt.paho.
Hi!
I ran into a problem that using your wrapper, I cannot set the message_retry parameter, which is why, if the message volume is large enough, the Paho starts retransmission after 20 seconds.
Are there any workarounds?
I'm working out of the advanced example on the front page and was working on adding a last will testament (LWT). However, I noticed that asyncio_mqtt.Client
with its usage of context manager is always cleaning up the connection by sending DISCONNECT. This has the effect that the LWT will not be sent out by the broker, when the program is handling any exceptions including keyboard interrupts. The only way to get LWT to work is to kill the program hard.
Consequently a "manual" LWT publish had to be added to ensure a LWT-like behavior on program exit. This was not immediately apparent, so it might be worth mentioning in the docs at some point.
I would propose that the asyncio_mqtt.Client.__aexit__
had some awareness (setable flag in class?) of the exit status and corresponding .disconnect()
had support for non-zero exit code in order for the broker to sent an ordinary LWT.
# Create a LWT
will = Will("some/topic", payload="offline", qos=2, retain=False)
# Connect to the MQTT broker
client = Client("test.mosquitto.org", will=will)
await stack.enter_async_context(client)
# Implement manual LWT
stack.push_async_callback(client.publish, "some/topic", "offline")
When running with the PYTHONASYNCIODEBUG=1
environment variable set, using the following code:
import asyncio
from asyncio_mqtt import Client
async def main():
client = Client("test.mosquitto.org")
print("Connecting...")
await client.connect()
print("Connected!")
print("Sleeping...")
await asyncio.sleep(5)
print("Slept!")
print("Disconnecting...")
await client.disconnect()
print("Disconnected!")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
loop.stop()
Connecting to an MQTT server raises the following exceptions:
$ PYTHONASYNCIODEBUG=1 PYTHONTRACEMALLOC=1 python3.8 test.py
Connecting...
Caught exception in on_socket_open: Non-thread-safe operation invoked on an event loop other than the current one
Traceback (most recent call last):
File "test.py", line 25, in <module>
loop.run_until_complete(main())
File "/usr/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
return future.result()
File "test.py", line 10, in main
await client.connect()
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/asyncio_mqtt/client.py", line 79, in connect
await loop.run_in_executor(None, self._client.connect, self._hostname, self._port, 60)
File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 941, in connect
return self.reconnect()
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 1117, in reconnect
self._call_socket_open()
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 2071, in _call_socket_open
self.on_socket_open(self, self._userdata, self._sock)
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/asyncio_mqtt/client.py", line 303, in _on_socket_open
self._misc_task = self._loop.create_task(self._misc_loop())
File "/usr/lib/python3.8/asyncio/base_events.py", line 427, in create_task
task = tasks.Task(coro, loop=self, name=name)
File "/usr/lib/python3.8/asyncio/base_events.py", line 713, in call_soon
self._check_thread()
File "/usr/lib/python3.8/asyncio/base_events.py", line 750, in _check_thread
raise RuntimeError(
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one
Caught exception in on_socket_close: 'NoneType' object has no attribute 'cancel'
Exception ignored in: <function Client.__del__ at 0x7f298a709dc0>
Traceback (most recent call last):
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 660, in __del__
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 704, in _reset_sockets
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 698, in _sock_close
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 2105, in _call_socket_close
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/asyncio_mqtt/client.py", line 308, in _on_socket_close
AttributeError: 'NoneType' object has no attribute 'cancel'
sys:1: RuntimeWarning: coroutine 'Client._misc_loop' was never awaited
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
File "/usr/lib/python3.8/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/concurrent/futures/thread.py", line 80, in _worker
work_item.run()
File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 941, in connect
return self.reconnect()
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 1117, in reconnect
self._call_socket_open()
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 2071, in _call_socket_open
self.on_socket_open(self, self._userdata, self._sock)
File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/asyncio_mqtt/client.py", line 303, in _on_socket_open
self._misc_task = self._loop.create_task(self._misc_loop())
File "/usr/lib/python3.8/asyncio/base_events.py", line 427, in create_task
task = tasks.Task(coro, loop=self, name=name)
task: <Task pending name='Task-2' coro=<Client._misc_loop() running at /home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/asyncio_mqtt/client.py:318> created at /usr/lib/python3.8/asyncio/base_events.py:427>
I've not found any implementation of LWT or 'will' messages in the readme or the code. Is this correct? If so; is this on the roadmap?
When enabling asyncio debugging (via: PYTHONASYNCIODEBUG=1
), in my code I keep getting this error.
2021-03-11 03:35:55 ERROR Caught exception in on_socket_open: Non-thread-safe operation invoked on an event loop other than the current one
2021-03-11 03:35:55 ERROR Connection to MQTT closed.
Traceback (most recent call last):
File "/code/listener.py", line 184, in mqtt_engine
await mqtt_handling()
File "/code/listener.py", line 151, in mqtt_handling
await stack.enter_async_context(client)
File "/usr/local/lib/python3.9/contextlib.py", line 556, in enter_async_context
result = await _cm_type.__aenter__(cm)
File "/usr/local/lib/python3.9/site-packages/asyncio_mqtt/client.py", line 324, in __aenter__
await self.connect()
File "/usr/local/lib/python3.9/site-packages/asyncio_mqtt/client.py", line 79, in connect
await loop.run_in_executor(None, self._client.connect, self._hostname, self._port, 60)
File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 52, in run
result = self.fn(*self.args, **self.kwargs)
File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 941, in connect
return self.reconnect()
File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 1117, in reconnect
self._call_socket_open()
File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 2071, in _call_socket_open
self.on_socket_open(self, self._userdata, self._sock)
File "/usr/local/lib/python3.9/site-packages/asyncio_mqtt/client.py", line 303, in _on_socket_open
self._misc_task = self._loop.create_task(self._misc_loop())
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 433, in create_task
task = tasks.Task(coro, loop=self, name=name)
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 748, in call_soon
self._check_thread()
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 785, in _check_thread
raise RuntimeError(
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one
2021-03-11 03:35:55 ERROR Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
File "/usr/local/lib/python3.9/threading.py", line 912, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.9/threading.py", line 954, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.9/threading.py", line 892, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 77, in _worker
work_item.run()
File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 52, in run
result = self.fn(*self.args, **self.kwargs)
File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 941, in connect
return self.reconnect()
File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 1117, in reconnect
self._call_socket_open()
File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 2071, in _call_socket_open
self.on_socket_open(self, self._userdata, self._sock)
File "/usr/local/lib/python3.9/site-packages/asyncio_mqtt/client.py", line 303, in _on_socket_open
self._misc_task = self._loop.create_task(self._misc_loop())
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 433, in create_task
task = tasks.Task(coro, loop=self, name=name)
task: <Task pending name='Task-13' coro=<Client._misc_loop() running at /usr/local/lib/python3.9/site-packages/asyncio_mqtt/client.py:318> created at /usr/local/lib/python3.9/asyncio/base_events.py:433>
My code is nearly identical to that of the advanced_example
, here's a snippet:
async def mqtt_handling():
async with AsyncExitStack() as stack:
tasks = set()
stack.push_async_callback(cancel_tasks, tasks)
logger.info(f"Connecting to MQTT Broker: {MQTT}.")
random_id = uuid4().hex
client = Client(
MQTT,
port=8883,
username=MQTT_USERNAME,
password=MQTT_PASSWORD,
client_id=f"{base_listener}-{random_id}",
tls_context=create_default_context(),
logger=logger,
)
await stack.enter_async_context(client) # <-- THIS IS LINE 151
logger.info(f"Connection to {MQTT} open.")
topic_filters = (
top1_filter,
topic2_filter,
topic3_filter,
)
for topic_filter in topic_filters:
manager = client.filtered_messages(topic_filter)
messages = await stack.enter_async_context(manager)
tasks.add(create_task(process_topic(messages, topic_filter)))
await client.subscribe(f"{base_listener_topic}/#")
logger.info(f"Listening for messages on {base_listener_topic}/# at {MQTT}.")
await gather(*tasks)
async def cancel_tasks(tasks):
for task in tasks:
if task.done():
continue
task.cancel()
try:
await task
except CancelledError:
pass
async def mqtt_engine():
while True:
try:
await mqtt_handling()
except MqttError as e:
logger.error(f"Connection to MQTT closed: {str(e)}")
except Exception:
logger.exception("Connection to MQTT closed.")
finally:
await sleep(3)
if __name__ == "__main__":
basicConfig(
level=INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
run(mqtt_engine())
This is really got me stumped.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.