Code Monkey home page Code Monkey logo

aiomqtt's People

Contributors

admiralnemo avatar andreasheine avatar chmielowiec avatar dependabot[bot] avatar edenhaus avatar empicano avatar fipwmaqzufheoxq92ebc avatar flyte avatar frederikaalund avatar functionpointer avatar gilbertsmink avatar gluap avatar jonathanplasse avatar laundmo avatar madnadyka avatar martinhjelmare avatar mmmspatz avatar oholsen avatar pallas avatar pi-slh avatar pre-commit-ci[bot] avatar sohaib90 avatar spacemanspiff2007 avatar steersbob avatar stewarthaines avatar tropxy avatar vitalerter avatar vvanglro avatar wrobell avatar xydan83 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aiomqtt's Issues

Incorrect use of typing.Union

in types.py:

ProtocolType = Union[paho.MQTTv31, paho.MQTTv311, paho.MQTTv5]

gives

TypeError: Union[arg, ...]: each arg must be a type. Got 3.

Reproduces at least on python 3.6, 3.7 & 3.8.

The advanced example is possibly buggy

Thank you for asyncio-mqtt; I've used it many of my home IoT applications.
I noticed one (pedantic) thing the the advance example that you might consider changing...
The function cancel_task contains the code:

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

Unfortunately, it is possible that the CancelledError exception arrives between task.cancel call and the try block. It is better practice to do:

    try:
        task.cancel()
        await task
    except asyncio.CancelledError:
        pass

using global client fails successive pytests with RuntimeError: Event loop is closed

Hi, this some somewhat related to #78

I am trying to find a way to use the client globally in my web application. The app runs fine, and the client seems to close cleanly. However, when I run my pytests I get an error after the first test. The client seems to remember the previous event loop between tests, even though I recreate the app for each test.

RuntimeError: Event loop is closed

I think the problem is that the client is imported from another module. But I don't see another way of doing it, since I need to have the client available in several files. I cannot create it in the main module since this would result in circular dependencies.

Below is the main file of my example project. The full project can be viewed here: https://github.com/nbraun-wolf/async-mqtt-fastapi-pytest

from asyncio import CancelledError, create_task

from fastapi import FastAPI


def create_app():
    app = FastAPI()

    # need to in a separate file in order
    # to avoid circular dependencies
    from client import mqtt
    from router import router

    @app.on_event("startup")
    async def connect():
        await mqtt.connect()

        async def subscribe(client):
            async with client.filtered_messages("test") as messages:
                await client.subscribe("test")
                async for message in messages:
                    print(message.payload.decode())

        global subscribe_task
        subscribe_task = create_task(subscribe(mqtt))

    @app.on_event("shutdown")
    async def disconnect():
        if not subscribe_task.done():
            subscribe_task.cancel()
            try:
                await subscribe_task
            except CancelledError:
                pass
        await mqtt.disconnect()
    
    # this router for example is using the client
    app.include_router(router)

    return app

This doesn't happen when using for each publish a new client as context manager, as well as a separate client for the subscription. But it breaks when trying to use the same client ID for the subscriber and the publisher clients, since it disconnects the subscriber after publishing a message when the context manager of the publishing client closes.

Conceptionally I have one instance of app, so it should have only a single client ID and get treated by the broker as one entity, which doesn't seem to be possible. The behaviour can be tested in this branch: https://github.com/nbraun-wolf/async-mqtt-fastapi-pytest/tree/multi-client

Apart from the disconnection problem, I feel like It's wasteful to create constantly new clients and making a new connection first before being able to publish a message.

What is the intended way to use the client in such a scenario?

asyncio.get_event_loop().add_reader() doesn't work on windows out of the box

Hey, this library rocks, thanks for writing it!

This is an upstream problem but, might be worth documenting the workaround somewhere, I am running this on Python 3.8.2 on Windows.

The API that asncio-mqtt uses calls asyncio.get_event_loop().add_reader()

On windows, the default event_loop doesn't support add_reader() and throws an NotYetImplemented exception.

There's some other discussion in other projects like tornadoweb/tornado#2608 (comment) about adding this as a workaround:

if sys.version_info[0] == 3 and sys.version_info[1] >= 8 and sys.platform.startswith('win'):
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

It might be worth just having that code snippet documented somewhere for windows users of asyncio-mqtt

Consider converting ReasonCodes into readable strings in MqttCodeError

Hi, new to the project, and loving where it's going.

I thought about opening a PR to start... but maybe some context first might make more sense. When debugging my app, I ran into a situation where a timeout would cause paho to stop sending all messages. I was catching and logging the errors, but missed the subtle difference between the MqttError and MqttCodeError types behind the scenes.

For reasons still somewhat unknown, the reason code was being set to an integer (not a mqtt.ReasonCode) and the subtle [Code: 4] added to the __str__() had slipped by. Once I took a deeper look at what was happening, I saw the extra information, as well as a helper method in paho's source code: https://github.com/eclipse/paho.mqtt.python/blob/master/src/paho/mqtt/client.py#L183

The simple complaint here, is that [Code: {}] is a small addition to an error message, easy to miss, and a not-very-idiomatic error number, when it could instead be an english error message in instances where the rc value is actually an integer.

using Paho's error_message handler or using their builtin methods, we could change the __str__() method in MqttCodeError to look something a little like this:

class MqttCodeError(MqttError):
    def __init__(self, rc: Union[int, mqtt.ReasonCodes], *args: Any):
        super().__init__(*args)
        self.rc = rc

    def __str__(self) -> str:
        if isinstance(self.rc, mqtt.ReasonCodes):
            return f"[code:{self.rc.value}] {str(self.rc)}"
        else:
            return f"[code:{self.rc}] {mqtt.error_string(self.rc)} {super().__str__()}"

But for such a simple change... I figured it'd be better to ask if there was a reason it was setup this way, or if I'm touching something that hasn't been considered in a while. Maybe my versions are worth considering? paho-mqtt-1.5.1 and asyncio_mqtt-0.9.1

If this sounds like a useful change, I'd be more than willing to open up a simple PR... otherwise I'd be interested in learning why the integer codes are preferred to Paho's internal error messages.

Immediate disconnected after subscribe

I'm using the example code, which worked a few weeks ago. Now, it's sending a DISCONNECT right after a SUBACK. I don't know if asyncio-mqtt or paho.mqtt.python changed. Has anyone seen behaviour like

DEBUG:mqtt:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'2c063326-vpn'
DEBUG:mqtt:Received CONNACK (0, 0)
DEBUG:mqtt:Sending SUBSCRIBE (d0, m1) [(b'prefix/+', 0)]
DEBUG:mqtt:Received SUBACK
DEBUG:mqtt:Sending DISCONNECT

with code like

async with Client(...) as client:
    async with client.filtered_messages(f'{request_prefix}/+') as messages:
        await client.subscribe(f'{request_prefix}/+')
        async for message in messages:
            pass

Currently, it never makes it into the innermost loop.

Publish should not wait for confirmation

The publish() method forces the user to wait for the message to be published. It is perfectly reasonable to publish a message with QOS > 0 when the client is disconnected. For example, on unreliable links, a loop in a thread/task is used to reconnect when the connection goes down. The publisher can continue on publishing at QOS > 0 without knowing anything about the status of the connection. This is not possible with the current publish() implementation and puts a bigger implementation burden on the user of this library.

newbie help getting started

In the examples, I am trying to use the code provided for the subscriber:

import json
from asyncio_mqtt import Client, ProtocolVersion

async with Client(
        "test.mosquitto.org",
        username="username",
        password="password",
        protocol=ProtocolVersion.V31
) as client:
    async with client.filtered_messages("floors/+/humidity") as messages:
        # subscribe is done afterwards so that we just start receiving messages 
        # from this point on
        await client.subscribe("floors/#")
        async for message in messages:
            print(message.topic)
            print(json.loads(message.payload))

Am I missing anything on the end, like a main==main?

Also when I run the code I get an error:

  File "mqtt_subscriber.py", line 6
    async with Client("test.mosquitto.org") as client:
    ^
SyntaxError: 'async with' outside async function

Thanks for any tips getting me up and running, not alot of wisdom here.

Client.connect() blocks the event loop

async with Client("10.10.0.1") as client:
    ...

blocks the event loop for 30 seconds. The source code appears to call the paho client connect() directly.

Disconnect after a certain number of messages were published

I'd like to bring the following issue to your attention:
eclipse/paho.mqtt.python#563
I added a comment explaining the issue and how asyncio-mqtt is involved.

As far as I know, the root cause is in paho-mqtt, but is exacerbated by how asyncio-mqtt uses paho-mqtt.
I'm not sure there's much you can do about it in asyncio-mqtt, except for calling loop() instead.
However, I believe that is not a full solution since the sockpair will still get out of sync.

Most likely when paho-mqtt is fixed, you'll want to bump the required version.

Reconnect support

Is there a way to handle lost connections to the broker in asyncio-mqtt? It seems like paho-mqtt implements automatic reconnection in its loop_forever function, for example, but I wasn't able to figure out how can I do a similar thing in asyncio-mqtt.

As a hacky solution, I tried awaiting the _disconnected future, and re-creating the Client instance, but this didn't work either, as the re-created client just hangs if the broker isn't available.

Bellow is the example code of trying to re-connect by creating a new client. To force a connection error, I stop and then restart my local mqtt broker.

import asyncio
from contextlib import AsyncExitStack
from asyncio_mqtt import MqttError, Client

def get_stack_and_client():
    return AsyncExitStack(), Client("127.0.0.1")

async def reconnect_example():
    stack, client = get_stack_and_client()
    await stack.enter_async_context(client)
    print("connected")

    try:
        await client._disconnected  # killing broker here
    except MqttError as exc:
        print(f"disconnected {exc}")  # I get a "[code:1] Could not disconnect" error here

        try:
            await stack.aclose()
        except Exception as exc:
            print(f"closing 1st stack: {exc}")  # "[code:1] Could not disconnect" again

        for attempt in range(3):
            print(f"reconnecting attempt #{attempt}")
            stack, client = get_stack_and_client()
            await stack.enter_async_context(client)  # the code just hangs on this line, without even raising ConnectinRefused
            print("connected")

    await asyncio.sleep(10)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(reconnect_example())
    loop.close()

Improvement suggestion: Can't parse any errors

The way you print the error as a string and return it makes it hard to parse. The example code tries reconnecting endlessly, even if the server is configured with bad port or host name.

Making the error parseable would be a huge improvement and would make it easy to improve the example code so it does not reconnect on any error, but on disconnects.

Does not support disconnect/connect, which means it doesn't support QOS > 0

This library does not work with the following (pseudocode):

client = Client(..., client_id='1234', clean_session=False)
client.connect()
... do stuff ...
client.disconnect() // or we got disconnected for any reason
client.connect()

Any iteration of subscribed topics will fail after the disconnect because the disconnected future in the client remains set. Because of that, the client cannot be reused after a disconnect. The client is where QOS > 0 is implemented (it has the queue of published messages) so that means this library cannot support publishing with QOS > 0.

Publish Example, protocol Version as required field and logging

Hi,

I just started using your lib and after struggling a bit I managed to do what I wanted. I have a topic that I subscribed to and I published some json data and printed it back in the subscriber:

Subscriber code:

async with Client("www.maqiatto.com", username="username", password="password", protocol=ProtocolVersion.V31) as client:
    async with client.filtered_messages("dummy/contactor") as messages:
        await client.subscribe("dummy/#")
        async for message in messages:
            print(message.topic)
            print(json.loads(message.payload))

Publisher code:

async with Client("www.maqiatto.com", username="username", password="password", protocol=ProtocolVersion.V31) as client:
    message = {"state": 3}
    await client.publish("dummy/contactor", payload=json.dumps(message), qos=2, retain=False)

This works and I get the following printed out in the terminal:

dummy/contactor
{'state': 3}

So, looks good ;)

But I ran into some issues before I could make it work:

  1. There is no example or test for publish, so I had to dive into the details of your lib and also paho-mqtt and figure out what to do. Also for the Client connection just by inspecting the code one can check that Client class also accepts username, password and other properties necessary for the connection. So, I guess if an example directory with different possibilities and even some more detailed doc page would suffice to solve this.
  2. First I tried to connect without specifying the Protocol used and it actually connects to the broker without an issue. The problem later is that I send a message to the broker, the process finishes and no error whatsoever is raised and I received no message in the subscriber side. So, I was quite puzzled about what was wrong and it was not until I defined the version that started working. Thus, I think the protocol version shouldnt be an optional field
  3. This brings me to the point where I tried to import ProtocolVersion Enum class directly from the lib package, but it wouldnt work, because ProtocolVersion is not part of the all list in the init.py. So, I had to import it specifying the client script:
from asyncio_mqtt.client import ProtocolVersion

I guess it would be better to add the ProtocolVersion to the init.py

  1. Last issue was receiving the JSON payload. I tried to decode the message directly when I received it:
async for message in messages:
            print(json.loads(message))

But this wouldnt work, because message is of MQTT data type. This was not clear for me and I had to search around to understand that the message received can be stripped into two properties: "topic" and "payload" and by decoding the payload it worked. Maybe is due to my inexperience that I didnt know that, but it would be cool if somewhere this is explained or showed in an example.

  1. I dont understand the purpose of the filtered_messages, because I created another topic called "test" and I sent a message to that topic, but I ended up receiving the message in the Subscriber whose code I provided above and that has a filter for the "contactor" topic. I guess this was not suppose to happen or was it? I even tried to filter and to subscribe to the "contactor" topic, only, like this:
async with client.filtered_messages("dummy/contactor") as messages:
        await client.subscribe("dummy/contactor")
        async for message in messages:
            print(message.topic)
            print(json.loads(message.payload))

And I sent a message, using the following code:

async with Client("www.maqiatto.com", username="username", password="password", protocol=ProtocolVersion.V31) as client:
    message = {"state": 3}
    await client.publish("dummy/test", payload=json.dumps(message), qos=2, retain=False)

But, I still got the message in the subscriber:

dummy/test
{'state': 3}

So what am I doing wrong here?

Thank you for your work and let me know what you think about this.
Best regards,
Andrรฉ

Broken pipe

After some days of running I am receiving the "Broken pipe" error. This is currently not caught by asyncio-mqtt. Tbh this is too low level for me to understand. I expect it has something to do with an overflow of messages. It occurs at the same moment, I am not sure which is the cause and which is the effect, that the cpu, mem and swap are suddenly going crazy.

  • Should this be something that my own script should catch and deal with?
  • Should this be something that asyncio-mqtt should catch and deal with?
  • or, is there another more preferred solution?

Trace:

evss_device_center.1.zaqg6zdkswua@evss24    | BrokenPipeError: [Errno 32] Broken pipe
evss_device_center.1.zaqg6zdkswua@evss24    | Exception in callback Client._on_socket_open.<locals>.cb() at /usr/local/lib/python3.8/site-packages/asyncio_mqtt/client.py:291
evss_device_center.1.zaqg6zdkswua@evss24    | handle: <Handle Client._on_socket_open.<locals>.cb() at /usr/local/lib/python3.8/site-packages/asyncio_mqtt/client.py:291>
evss_device_center.1.zaqg6zdkswua@evss24    | Traceback (most recent call last):
evss_device_center.1.zaqg6zdkswua@evss24    |   File "/usr/local/lib/python3.8/asyncio/events.py", line 81, in _run
evss_device_center.1.zaqg6zdkswua@evss24    |     self._context.run(self._callback, *self._args)
evss_device_center.1.zaqg6zdkswua@evss24    |   File "/usr/local/lib/python3.8/site-packages/asyncio_mqtt/client.py", line 292, in cb
evss_device_center.1.zaqg6zdkswua@evss24    |     client.loop_read()
evss_device_center.1.zaqg6zdkswua@evss24    |   File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 1572, in loop_read
evss_device_center.1.zaqg6zdkswua@evss24    |     rc = self._packet_read()
evss_device_center.1.zaqg6zdkswua@evss24    |   File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 2310, in _packet_read
evss_device_center.1.zaqg6zdkswua@evss24    |     rc = self._packet_handle()
evss_device_center.1.zaqg6zdkswua@evss24    |   File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 2936, in _packet_handle
evss_device_center.1.zaqg6zdkswua@evss24    |     return self._handle_publish()
evss_device_center.1.zaqg6zdkswua@evss24    |   File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 3219, in _handle_publish
evss_device_center.1.zaqg6zdkswua@evss24    |     rc = self._send_puback(message.mid)
evss_device_center.1.zaqg6zdkswua@evss24    |   File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 2471, in _send_puback
evss_device_center.1.zaqg6zdkswua@evss24    |     return self._send_command_with_mid(PUBACK, mid, False)
evss_device_center.1.zaqg6zdkswua@evss24    |   File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 2580, in _send_command_with_mid
evss_device_center.1.zaqg6zdkswua@evss24    |     return self._packet_queue(command, packet, mid, 1)
evss_device_center.1.zaqg6zdkswua@evss24    |   File "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py", line 2911, in _packet_queue
evss_device_center.1.zaqg6zdkswua@evss24    |     self._sockpairW.send(sockpair_data)

Advanced use example hangs up

The code hangs up, without any message, when an exception is occurred during log_messages execution.

async def log_messages(messages, template): async for message in messages: print(template.format(message.payload.decode()) # if it failed to decode, because of binary data in my case it hangs up forever

For my specific case i figured it out, but it is a dangerous.

Cannot set the keepalive time

The paho client allows you to set the keepalive time. The asyncio-mqtt client does not. That is a pretty important parameter for controlling how much automatic communication happens with the mqtt server, and also for controlling when the server sends out the LWT.

requst for enhancement : paho.mqtt.proxy_set() - support

REQUEST FOR ENHANCEMENT

if i am right, the followin function is presently unimplemented in asyncio-mqtt :

https://github.com/eclipse/paho.mqtt.python/blob/225ab3757f6818ba85eb80564948d1c787190cba/src/paho/mqtt/client.py#L875
( honestly, i wont pull because i think code-change is @maintainer ;-) )

asyncio-mqtt SHOULD make provisions to utilize paho.mqtt.client.proxy_set(**proxy_args)

there are no issues when using transport="websockets", since the websocket tcp-connection is proxy-unaware (hope i understand this right ;-), but e.g. connection via dynamic ssh-tunnels - by nature - need explicit support for connect via proxy.

thank your for enabling this "new" feature
w.

Question: how would I change the message filter?

Hi,
I am writing a Python based video system for university lectures - driven by the urgent need. Being rather new to Python and asyncio I may be above my league here... Sticking with your (bigger) example, I have successfully implemented an MQTT client within my application, it runs as a coroutine.

Question: is there a way I can change the topic of the messages in the filtered message logger after the task has been created. If not, should I cancel this task and recreate it to do that?

Thanks
Peter

newbie help 2

So I have been butchering down the advanced example to my level of understanding.

A couple questions for the MQTT newbie.

What does the def log_messages function do? For whatever reason the debug statements I inserted, this method/function is never hit.

Is this also legit code for the publish part to filter for a string comparison like this below?

async def post_to_topics(client, topics):
    while True:
        for topic in topics:
        
            if topic == 'zone_setpoints':

Any tips greatly appreciated. I am still a little bit confused on the basics, like what device defines the topics. For example if lots of MQTT client devices running with one broker, can each client device have unique topics and then maybe a few overlapping ones to share data between the client devices?

import asyncio, json
from contextlib import AsyncExitStack, asynccontextmanager
from random import randrange
from asyncio_mqtt import Client, MqttError


async def advanced_example():
    # context managers create a stack to help manage them
    
    async with AsyncExitStack() as stack:
        # Keep track of the asyncio tasks that we create, so that
        # we can cancel them on exit
        tasks = set()
        stack.push_async_callback(cancel_tasks, tasks)

        # Connect to the MQTT broker
        client = Client("test.mosquitto.org")
        await stack.enter_async_context(client)

        # topic filters
        topic_filters = (
            "electric_meter",
            "zone_temps",
            "zone_setpoints",
            "reheat_valves"
        )
        for topic_filter in topic_filters:
        
            # Log all messages that matches the filter
            manager = client.filtered_messages(topic_filter)
            messages = await stack.enter_async_context(manager)
            template = f'[topic_filter="{topic_filter}"] {{}}'
            task = asyncio.create_task(log_messages(messages, template))
            tasks.add(task)

        # Messages that doesn't match a filter will get logged here
        messages = await stack.enter_async_context(client.unfiltered_messages())
        print("INCOMING, NO TEMPLATE FOUND FOR THIS ONE")
        task = asyncio.create_task(log_messages(messages, "[unfiltered] {}"))
        tasks.add(task)

        # Subscribe to topic(s)
        # subscribe *after* starting the message
        await client.subscribe("reheat_valves")

        # Publish a random value to each of these topics
        topics = (
            "electric_meter",
            "zone_temps",
            "zone_setpoints",
            "reheat_valves"
        )
        task = asyncio.create_task(post_to_topics(client, topics))
        tasks.add(task)

        # Wait for everything to complete (or fail due to, e.g., network
        # errors)
        await asyncio.gather(*tasks)

async def post_to_topics(client, topics):
    while True:
        for topic in topics:
        
            if topic == 'zone_setpoints':
                print("LETS gather some data, future function ToDo...")
                message = {
                  "status": "read_success",
                  "present_value": randrange(100)
                }
                print(f'[topic="{topic}"] ZONE Setpoints Publishing message={message}')
                await client.publish(topic, json.dumps(message), qos=1)
            else:
                print(f'PASSING TOPIC on {topic}')
            await asyncio.sleep(30)

async def log_messages(messages, template):    
    async for message in messages:
        # ๐Ÿค” Note that we assume that the message paylod is an
        # UTF8-encoded string (hence the `bytes.decode` call).
        print("FUNCTION HIT def log_messages!")
        print("TEMPLATES TO DECODE")
        print(template.format(message.payload.decode()))

async def cancel_tasks(tasks):
    for task in tasks:
        if task.done():
            continue
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

async def main():
    # Run the advanced_example indefinitely. Reconnect automatically
    # if the connection is lost.
    reconnect_interval = 10  # [seconds]
    while True:
        try:
            await advanced_example()
        except MqttError as error:
            print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
        finally:
            await asyncio.sleep(reconnect_interval)


asyncio.run(main())

Mqtt errors

Hi, sometime I get this exceptions:
What can be the reason of them?

  • MqttError('Operation timed out')
  • MqttCodeError('Could not publish message')

Disconnected during message iteration

Hi,

I have this issue:

ERROR 2020-10-06 08:59:45,391 client 17392 19700 failed to receive on socket: [SSL] malloc failure (_ssl.c:2607)
ERROR 2020-10-06 08:59:45,395 fw_mqtt_client 17392 19700 Broker: <broker> not connected:Disconnected during message iteration

Any idea?

Thanks

[Errno 104] Connection reset by peer

I received a lot of disconnections from few servers with same error:
here is a extra data:

  • "message": "failed to receive on socket: [Errno 104] Connection reset by peer",
  • "path": "/usr/local/lib/python3.8/site-packages/paho/mqtt/client.py",

Paho mqtt client reconnecting from callback loop

TLDR

There are two cases during connection that will result in reconnecting from the event loop, blocking the loop in the process.

Background

This library, asyncio-mqtt, wraps paho-mqtt and uses its external loop feature to drive paho-mqtt using the asyncio event loop. The asyncio event loop should never block so it's important to not run blocking I/O inside the event loop.

paho-mqtt-client details

We hook into the event loop by using the loop.add_reader method and connect the paho-mqtt callback loop_read. This callback handles the incoming packets from the socket, by calling _packet_read and in turn _packet_handle. The latter directs to different callbacks depending on the command. One of those cases is the _handle_connack callback. There are two cases in this callback that will result in calling the reconnect method of the paho mqtt client. This method does blocking I/O. So if we would end up in either of those cases, we would block the event loop during the reconnection.

Case 1:

protocol == MQTTv311 and result == CONNACK_REFUSED_PROTOCOL_VERSION

Case 2:

protocol == MQTTv311 and result == CONNACK_REFUSED_IDENTIFIER_REJECTED and self._client_id == b''

https://github.com/eclipse/paho.mqtt.python/blob/42f0b13001cb39aee97c2b60a3b4807314dfcb4d/src/paho/mqtt/client.py#L2985-L3003

Example log with forced incorrect MQTT protocol (case 1)

2020-11-27 12:04:31,471 INFO (MainThread) [asyncio_mqtt] Starting client example.
2020-11-27 12:04:31,472 DEBUG (MainThread) [asyncio] Using selector: EpollSelector
2020-11-27 12:04:31,480 DEBUG (ThreadPoolExecutor-0_0) [asyncio_mqtt] Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
2020-11-27 12:04:31,482 DEBUG (MainThread) [asyncio_mqtt] Received CONNACK (0, 1), attempting downgrade to MQTT v3.1.
2020-11-27 12:04:31,482 DEBUG (MainThread) [asyncio_mqtt] Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
Error "Operation timed out". Reconnecting in 3 seconds.
2020-11-27 12:04:44,490 DEBUG (ThreadPoolExecutor-0_0) [asyncio_mqtt] Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
2020-11-27 12:04:44,493 DEBUG (MainThread) [asyncio_mqtt] Received CONNACK (0, 1), attempting downgrade to MQTT v3.1.
2020-11-27 12:04:44,494 DEBUG (MainThread) [asyncio_mqtt] Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
Error "Operation timed out". Reconnecting in 3 seconds.
^C2020-11-27 12:04:55,391 INFO (MainThread) [asyncio_mqtt] Exiting client example.

The first Sending CONNECT happens in the thread pool. This is good. But after the downgrade the next Sending CONNECT happens in the main thread, ie event loop. That's bad.

Solution?

I haven't found any easy way of resolving this without changes in paho-mqtt. I've been playing with monkey patching the paho-mqtt reconnect method, but it doesn't seem clean so I don't really like that.

The upside is that these two cases only occur during connection and may also be avoided by making sure the correct MQTT protocol is used and setting the client_id.

I'm opening this issue, for verification and information, and discussion if there's anything we can do to mitigate this.

Handle messages concurrently

I'm not sure if this is a bug or the intended behavior. I have a Javascript client which uses async-mqtt. With this client I am able to write code as follows

async on_message(topic, payload) {
    let result = await long_running_handler(topic, payload);  
	do_something(result);
}

on_message will be called again even if long_running_handler is still handling the previous packet.

I am not able to replicate this behavior in Python using the basic example.

async with Client("test.mosquitto.org") as client:
    async with client.filtered_messages("floors/+/humidity") as messages:
        await client.subscribe("floors/#")
        async for message in messages:
            # This blocks the for loop from processing the next message
            result = await long_running_handler(topic, payload) 
			do_something(result)

			# workaround is to start the handler with ensure_future, but this complicates the flow
			# _future = asyncio.ensure_future(long_running_handler(topic, payload))
            # now what?

I must resort to using asyncio.ensure_future to start the handler so that on_message can return immediately.

Is there a way around this? Really I am not interested in using (un)filtered_message(), I think I just need a barebones async wrapper around the paho _client.on_message which calls asyncio.ensure_future(user_on_message_callback)

Question about publish

Thanks for the work on this library, I'm using this library and want to publish a message outside of AsyncExitStack but don't know how to do it, any suggestions?

Not working in windows

OS - WIndows 10
Python - 3.9.5
asyncio-mqtt - 0.9.1

Caught exception in on_socket_open: 
Traceback (most recent call last):
  File "d:\#CODE\Python\mqtt.py", line 8, in <module>
    asyncio.run(main())
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 642, in run_until_complete      
    return future.result()
  File "d:\#CODE\Python\mqtt.py", line 5, in main
    async with Client("test.mosquitto.org") as client:
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\asyncio_mqtt\client.py", line 512, in __aenter__
    await self.connect()
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\asyncio_mqtt\client.py", line 173, in connect        
    await loop.run_in_executor(
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\concurrent\futures\thread.py", line 52, in run
    result = self.fn(*self.args, **self.kwargs)
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 941, in connect
    return self.reconnect()
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 1117, in reconnect        
    self._call_socket_open()
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 2071, in _call_socket_open
    self.on_socket_open(self, self._userdata, self._sock)
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\asyncio_mqtt\client.py", line 467, in _on_socket_open
    self._loop.add_reader(sock.fileno(), cb)
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\asyncio\events.py", line 504, in add_reader
    raise NotImplementedError
NotImplementedError
Caught exception in on_socket_close: 
Exception ignored in: <function Client.__del__ at 0x0000029A16F06EE0>
Traceback (most recent call last):
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 660, in __del__
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 704, in _reset_sockets     
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 698, in _sock_close        
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\paho\mqtt\client.py", line 2105, in _call_socket_close
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\site-packages\asyncio_mqtt\client.py", line 482, in _on_socket_close
  File "C:\Users\surya\AppData\Local\Programs\Python\Python39\lib\asyncio\events.py", line 507, in remove_reader
NotImplementedError: 

how to receive message in a limited time window?

hi, there's a logic reason which made me need to check some message(like server response) in a limited time window(like 10 seconds?) . i havnt seen that support in the library, is it possible to do some hack for such feature?

Calling client.subscribe with a zero length list causes TimeoutError

Calling client.subscribe([]) should fail/return immediately, but it gets passed on to the paho client, which happily subscribes to no topics. The client.subscribe function then sets up a callback to wait for paho to fire an on_subscribe callback, but it never does, because there were no topics subscribed to.

Recreate with:

import asyncio

from asyncio_mqtt import Client


async def main():
    client = Client("test.mosquitto.org")

    print("Connecting...")
    await client.connect()
    print("Connected!")

    print("Subscribing...")
    await client.subscribe()
    print("Subscribed!")

    print("Disconnecting...")
    await client.disconnect()
    print("Disconnected!")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()
        loop.stop()

The client.subscribe function for reference:

    async def subscribe(self, *args, timeout=10, **kwargs):
        result, mid = self._client.subscribe(*args, **kwargs)
        # Early out on error
        if result != mqtt.MQTT_ERR_SUCCESS:
            raise MqttCodeError(result, 'Could not subscribe to topic')
        # Create future for when the on_subscribe callback is called
        cb_result = asyncio.Future()
        with self._pending_call(mid, cb_result):
            # Wait for cb_result
            return await self._wait_for(cb_result, timeout=timeout)

A public method to return the message amount of a message queue?

I am building a health check API for my company's monitor service and considering taking message amount of a message queue as a health metric. Is adding a public method to return the message amount of a message queue a good idea?

Just as,

    def _cb_and_generator(self, *, log_context, queue_maxsize=0):
        # Queue to hold the incoming messages
        self.messages = messages = asyncio.Queue(maxsize=queue_maxsize)

and

        async with client.filtered_messages("#") as messages:
            await client.subscribe("#")
            async for message in messages:
                print("QUEUE SIZE", client.messages.qsize())

Thanks for your great works!

Request for a patch release 0.9.1

Unsuccessful authentication to the broker leads to an invalid error message in asyncio-mqtt. The issue was addressed in #58 and fixed with PR #59.
I'd really like to see the change on PyPI.org under a patch release 0.9.1.

By any chance, are there plans to release a patch release anytime soon?

disconnect cleanly

Hi, what is the intended way to stop cleanly? When calling client.disconnect, it throws an error MqttError("Disconnected during message iteration").

If there is no other way, how can we know if the user called disconnect on purpose? In the original paho client, we have reason code 0 for that. But I can't find a way to get this info from this wrapper here.

I am trying to run this client in a background thread, and using fastapi as main thread. I have adjusted the example for the readme a bit. I am using this loop boolean currently, but It's not pretty.

Client Code
from asyncio import CancelledError, create_task, gather, run, sleep
from contextlib import AsyncExitStack
from threading import Thread

from asyncio_mqtt import Client, MqttError

broker_host = "localhost"
topic_prefixes = ("test/#",)
topic_handlers = {}

client = None
loop = True


def topic(topic):
    def wrapper(handler):
        topic_handlers[topic] = handler
        return handler

    return wrapper


async def run_client():
    global client
    async with AsyncExitStack() as stack:
        tasks = set()

        async def cancel_tasks(tasks):
            for task in tasks:
                if task.done():
                    continue
                task.cancel()
                try:
                    await task
                except CancelledError:
                    pass

        async def handle_messages(client, messages, handler):
            async for message in messages:
                await handler(client, message.payload.decode())

        stack.push_async_callback(cancel_tasks, tasks)
        client = Client(broker_host)

        await stack.enter_async_context(client)

        for topic_filter, handler in topic_handlers.items():
            manager = client.filtered_messages(topic_filter)
            messages = await stack.enter_async_context(manager)
            task = create_task(handle_messages(client, messages, handler))
            tasks.add(task)

        for topic_prefix in topic_prefixes:
            await client.subscribe(topic_prefix)

        await gather(*tasks)


async def background_client():
    reconnect_interval = 3
    while loop:
        try:
            await run_client()
        except MqttError as error:
            if loop:
                print(f'Error "{error}"')
        finally:
            if loop:
                await sleep(reconnect_interval)


def _paho_thread():
    run(background_client())


def get_client():
    if not client:
        raise Exception("could not get client, did you forget to call mqtt_startup?")
    return client


paho_thread = Thread(target=_paho_thread, daemon=True)


async def mqtt_startup():
    paho_thread.start()


async def mqtt_shutdown():
    global loop
    client = get_client()
    loop = False
    await client.disconnect()
    paho_thread.join()
Fastapi Code
from json import dumps
from typing import Optional

from fastapi import FastAPI

from mqtt import get_client, mqtt_shutdown, mqtt_startup, topic

app = FastAPI()


@app.on_event("startup")
async def startup_event():
    await mqtt_startup()


@app.on_event("shutdown")
async def shutdown_event():
    await mqtt_shutdown()


@app.get("/")
async def read_root():
    client = get_client()
    await client.publish("test/bar", dumps({"pub": "foo"}))
    return {"Hello": "World"}


@topic("test/foo")
async def test(client, payload):
    print("foo handler")
    print(payload)
    await client.publish("test/bar", dumps({"pub": "foo"}))


@topic("test/bar")
async def foo(client, payload):
    print("bar handler")
    print(payload)

pending publish calls

Hi,
what is this warning means?
I am publishing with qos1
at:
/usr/local/lib/python3.7/site-packages/asyncio_mqtt/client.py

There are 13 pending publish calls.
There are 12 pending publish calls.

Very slow publishing data more 100 MB.

Hello again! I noticed one thing, when I try to transfer a file larger than 100MB, it takes a very long time when publishing, it can take 3-5 minutes. If I use TLS, then 2-3 times longer. Moreover, the subscriber receives this file instantly. Any idea why this might be the case? Could it be the small size of the TCP allocated buffer?

image

P.s. I`m using Ubuntu 20.04.1

Not receiving first view Retained messages after subscribing

I have implemented the example โ€œAdvance useโ€. However, after subscribing do not receive all retained messages with log_filtered_messages.

With both your example and MQTTBox (Chrome browser extention) I subscribe to the same broker and topic โ€˜#โ€™, and expect to receive 4 retained messages. With MQTTbox I receive all 4 messages, but with your client I only receive 2 retained messages. I tried looking into your code to explore what could be the issue, but lag the experience to debug.

My code for subscribing and handling the message:

       await client.subscribe('#')
        asyncio.create_task(log_filtered_messages(client, '#'))
        asyncio.create_task(log_unfiltered_messages(client))

What I managed to do is add debug logging and I have created the log below. In yellow highlighted the messaged that are received, but are not handled with my script.

2020-05-14 11:03:50,534 - root - DEBUG - Sending CONNECT (u1, p1, wr0, wq0, wf0, c1, k60) client_id=b''
2020-05-14 11:03:50,538 - root - DEBUG - Received CONNACK (0, 0)
2020-05-14 11:03:50,539 - root - DEBUG - Sending SUBSCRIBE (d0, m1) [(b'#', 0)]
2020-05-14 11:03:50,541 - root - DEBUG - Received SUBACK
2020-05-14 11:03:50,542 - root - DEBUG - Received PUBLISH (d0, q0, r1, m0), 'modules/...', ...  (421 bytes)
2020-05-14 11:03:50,544 - root - DEBUG - Received PUBLISH (d0, q0, r1, m0), 'modules/....', ...  (524 bytes)
2020-05-14 11:03:50,545 - root - DEBUG - Received PUBLISH (d0, q0, r1, m0), 'modules/...', ...  (498 bytes)
[topic_filter="#"]: MESSAGE 1
2020-05-14 11:03:50,546 - root - DEBUG - Received PUBLISH (d0, q0, r1, m0), 'modules/...', ...  (469 bytes)
[topic_filter="#"]: MESSAGE 2

Have you experienced this behavior before, or have an idea what is going wrong?

Disconnects on long wait time for new messages

I have a discord bot that is supposed to send a message to discord after receiving an MQTT message. There is very little activity, so typically there is hours or even days between messages received. I noticed that when there's a long wait, the first time a message is published and is received by the bot, I hit Disconnected during message iteration. The result is the message on the subscribed topic is gone. Here is a log of the output when this happens:

bot   | 2020-12-19 05:51:55 INFO Connecting to MQTT.
bot   | 2020-12-19 05:51:55 INFO Connection to MQTT open.
...
bot   | 2020-12-19 16:18:19 ERROR Connection to MQTT closed: Disconnected during message iteration
bot   | 2020-12-19 16:18:22 INFO Connecting to MQTT.
bot   | 2020-12-19 16:18:22 INFO Connection to MQTT open.

Subsequent messages are published minutes later, and the bot is able to process the message as intended.

I've looked at the code in asyncio-mqtt but it's unclear to me how to prevent this condition. Any advice is greatly appreciated.

For background, the bot and mosquito are running with docker-compose, and are on the same docker network. My connection and message subscription loop is identical to the code in the test.

advanced_example bug

Advanced example has:

    client = Client("test.mosquitto.org")

Needs to be:

    async with Client("test.mosquitto.org") as client:
        ...

error durring connection

Hi, i have error:
After few days that mqtt was connected well to broker suddenly i get an error:


  File "/usr/local/lib/python3.7/site-packages/paho/mqtt/client.py", line 1572, in loop_read,
2021-01-07 08:52:16,574,base_events.py,base_events,default_exception_handler,ERROR,6,MainThread,Exception in callback Client._on_socket_open.<locals>.cb() at /usr/local/lib/python3.7/site-packages/asyncio_mqtt/client.py:291,
asyncio.base_futures.InvalidStateError: invalid state,
2021-01-07 08:52:16,574,client.py,client,_easy_log,ERROR,6,MainThread,Caught exception in on_connect: invalid state,
handle: <Handle Client._on_socket_open.<locals>.cb() at /usr/local/lib/python3.7/site-packages/asyncio_mqtt/client.py:291>,
Traceback (most recent call last):,
  File "/usr/local/lib/python3.7/asyncio/events.py", line 88, in _run,
    self._context.run(self._callback, *self._args),
  File "/usr/local/lib/python3.7/site-packages/asyncio_mqtt/client.py", line 292, in cb,
    client.loop_read(),
    rc = self._packet_read(),
  File "/usr/local/lib/python3.7/site-packages/paho/mqtt/client.py", line 2310, in _packet_read,
    rc = self._packet_handle(),
  File "/usr/local/lib/python3.7/site-packages/paho/mqtt/client.py", line 2942, in _packet_handle,
    return self._handle_connack(),
  File "/usr/local/lib/python3.7/site-packages/paho/mqtt/client.py", line 3030, in _handle_connack,
    self, self._userdata, flags_dict, result),
  File "/usr/local/lib/python3.7/site-packages/asyncio_mqtt/client.py", line 242, in _on_connect,
    self._connected.set_result(rc),
future: <Future finished exception=MqttCodeError('Unexpected disconnect')>,
asyncio_mqtt.error.MqttCodeError: [code:1] Unexpected disconnect

Can't set message_retry_set()

Hi!
I ran into a problem that using your wrapper, I cannot set the message_retry parameter, which is why, if the message volume is large enough, the Paho starts retransmission after 20 seconds.
Are there any workarounds?

How to deal with LWT and DISCONNECT reason

I'm working out of the advanced example on the front page and was working on adding a last will testament (LWT). However, I noticed that asyncio_mqtt.Client with its usage of context manager is always cleaning up the connection by sending DISCONNECT. This has the effect that the LWT will not be sent out by the broker, when the program is handling any exceptions including keyboard interrupts. The only way to get LWT to work is to kill the program hard.

Consequently a "manual" LWT publish had to be added to ensure a LWT-like behavior on program exit. This was not immediately apparent, so it might be worth mentioning in the docs at some point.

I would propose that the asyncio_mqtt.Client.__aexit__ had some awareness (setable flag in class?) of the exit status and corresponding .disconnect() had support for non-zero exit code in order for the broker to sent an ordinary LWT.

# Create a LWT
will = Will("some/topic", payload="offline", qos=2, retain=False)

# Connect to the MQTT broker
client = Client("test.mosquitto.org", will=will)
await stack.enter_async_context(client)

# Implement manual LWT
stack.push_async_callback(client.publish, "some/topic", "offline")

Exception in on_socket_open: Non-thread-safe operation invoked on an event loop other than the current one

When running with the PYTHONASYNCIODEBUG=1 environment variable set, using the following code:

import asyncio

from asyncio_mqtt import Client


async def main():
    client = Client("test.mosquitto.org")

    print("Connecting...")
    await client.connect()
    print("Connected!")

    print("Sleeping...")
    await asyncio.sleep(5)
    print("Slept!")

    print("Disconnecting...")
    await client.disconnect()
    print("Disconnected!")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()
        loop.stop()

Connecting to an MQTT server raises the following exceptions:

$ PYTHONASYNCIODEBUG=1 PYTHONTRACEMALLOC=1 python3.8 test.py 
Connecting...
Caught exception in on_socket_open: Non-thread-safe operation invoked on an event loop other than the current one
Traceback (most recent call last):
  File "test.py", line 25, in <module>
    loop.run_until_complete(main())
  File "/usr/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
  File "test.py", line 10, in main
    await client.connect()
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/asyncio_mqtt/client.py", line 79, in connect
    await loop.run_in_executor(None, self._client.connect, self._hostname, self._port, 60)
  File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 941, in connect
    return self.reconnect()
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 1117, in reconnect
    self._call_socket_open()
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 2071, in _call_socket_open
    self.on_socket_open(self, self._userdata, self._sock)
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/asyncio_mqtt/client.py", line 303, in _on_socket_open
    self._misc_task = self._loop.create_task(self._misc_loop())
  File "/usr/lib/python3.8/asyncio/base_events.py", line 427, in create_task
    task = tasks.Task(coro, loop=self, name=name)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 713, in call_soon
    self._check_thread()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 750, in _check_thread
    raise RuntimeError(
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one
Caught exception in on_socket_close: 'NoneType' object has no attribute 'cancel'
Exception ignored in: <function Client.__del__ at 0x7f298a709dc0>
Traceback (most recent call last):
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 660, in __del__
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 704, in _reset_sockets
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 698, in _sock_close
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 2105, in _call_socket_close
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/asyncio_mqtt/client.py", line 308, in _on_socket_close
AttributeError: 'NoneType' object has no attribute 'cancel'
sys:1: RuntimeWarning: coroutine 'Client._misc_loop' was never awaited
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/concurrent/futures/thread.py", line 80, in _worker
    work_item.run()
  File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 941, in connect
    return self.reconnect()
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 1117, in reconnect
    self._call_socket_open()
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/paho/mqtt/client.py", line 2071, in _call_socket_open
    self.on_socket_open(self, self._userdata, self._sock)
  File "/home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/asyncio_mqtt/client.py", line 303, in _on_socket_open
    self._misc_task = self._loop.create_task(self._misc_loop())
  File "/usr/lib/python3.8/asyncio/base_events.py", line 427, in create_task
    task = tasks.Task(coro, loop=self, name=name)
task: <Task pending name='Task-2' coro=<Client._misc_loop() running at /home/flyte/workspaces/asyncio-mqtt/ve/lib/python3.8/site-packages/asyncio_mqtt/client.py:318> created at /usr/lib/python3.8/asyncio/base_events.py:427>

implementation of will-messages

I've not found any implementation of LWT or 'will' messages in the readme or the code. Is this correct? If so; is this on the roadmap?

Non-thread-safe operation invoked on an event loop other than the current one

When enabling asyncio debugging (via: PYTHONASYNCIODEBUG=1), in my code I keep getting this error.

2021-03-11 03:35:55 ERROR Caught exception in on_socket_open: Non-thread-safe operation invoked on an event loop other than the current one
2021-03-11 03:35:55 ERROR Connection to MQTT closed.
Traceback (most recent call last):
  File "/code/listener.py", line 184, in mqtt_engine
    await mqtt_handling()
  File "/code/listener.py", line 151, in mqtt_handling
    await stack.enter_async_context(client)
  File "/usr/local/lib/python3.9/contextlib.py", line 556, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/usr/local/lib/python3.9/site-packages/asyncio_mqtt/client.py", line 324, in __aenter__
    await self.connect()
  File "/usr/local/lib/python3.9/site-packages/asyncio_mqtt/client.py", line 79, in connect
    await loop.run_in_executor(None, self._client.connect, self._hostname, self._port, 60)
  File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 52, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 941, in connect
    return self.reconnect()
  File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 1117, in reconnect
    self._call_socket_open()
  File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 2071, in _call_socket_open
    self.on_socket_open(self, self._userdata, self._sock)
  File "/usr/local/lib/python3.9/site-packages/asyncio_mqtt/client.py", line 303, in _on_socket_open
    self._misc_task = self._loop.create_task(self._misc_loop())
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 433, in create_task
    task = tasks.Task(coro, loop=self, name=name)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 748, in call_soon
    self._check_thread()
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 785, in _check_thread
    raise RuntimeError(
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one
2021-03-11 03:35:55 ERROR Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "/usr/local/lib/python3.9/threading.py", line 912, in _bootstrap
    self._bootstrap_inner()
  File "/usr/local/lib/python3.9/threading.py", line 954, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.9/threading.py", line 892, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 77, in _worker
    work_item.run()
  File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 52, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 941, in connect
    return self.reconnect()
  File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 1117, in reconnect
    self._call_socket_open()
  File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 2071, in _call_socket_open
    self.on_socket_open(self, self._userdata, self._sock)
  File "/usr/local/lib/python3.9/site-packages/asyncio_mqtt/client.py", line 303, in _on_socket_open
    self._misc_task = self._loop.create_task(self._misc_loop())
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 433, in create_task
    task = tasks.Task(coro, loop=self, name=name)
task: <Task pending name='Task-13' coro=<Client._misc_loop() running at /usr/local/lib/python3.9/site-packages/asyncio_mqtt/client.py:318> created at /usr/local/lib/python3.9/asyncio/base_events.py:433>

My code is nearly identical to that of the advanced_example, here's a snippet:

async def mqtt_handling():
    async with AsyncExitStack() as stack:
        tasks = set()
        stack.push_async_callback(cancel_tasks, tasks)

        logger.info(f"Connecting to MQTT Broker: {MQTT}.")
        random_id = uuid4().hex
        client = Client(
            MQTT,
            port=8883,
            username=MQTT_USERNAME,
            password=MQTT_PASSWORD,
            client_id=f"{base_listener}-{random_id}",
            tls_context=create_default_context(),
            logger=logger,
        )
        await stack.enter_async_context(client)             # <-- THIS IS LINE 151
        logger.info(f"Connection to {MQTT} open.")

        topic_filters = (
            top1_filter,
            topic2_filter,
            topic3_filter,
        )
        for topic_filter in topic_filters:
            manager = client.filtered_messages(topic_filter)
            messages = await stack.enter_async_context(manager)
            tasks.add(create_task(process_topic(messages, topic_filter)))

        await client.subscribe(f"{base_listener_topic}/#")
        logger.info(f"Listening for messages on {base_listener_topic}/# at {MQTT}.")

        await gather(*tasks)


async def cancel_tasks(tasks):
    for task in tasks:
        if task.done():
            continue
        task.cancel()
        try:
            await task
        except CancelledError:
            pass


async def mqtt_engine():
    while True:
        try:
            await mqtt_handling()
        except MqttError as e:
            logger.error(f"Connection to MQTT closed: {str(e)}")
        except Exception:
            logger.exception("Connection to MQTT closed.")
        finally:
            await sleep(3)


if __name__ == "__main__":
    basicConfig(
        level=INFO,
        format="%(asctime)s %(levelname)s %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )

    run(mqtt_engine())

This is really got me stumped.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.