Code Monkey home page Code Monkey logo

Comments (8)

tropxy avatar tropxy commented on June 3, 2024 1

Hi @frederikaalund thanks for your fast reply! Yes, I would be happy to help around where I am able to. I think we all have the same problem: Time xD I am also a maintainer of another library (https://github.com/mobilityhouse/ocpp/tree/master/ocpp) and with work and other side projects gets tricky. Nevertheless, I think I can easily do some changes that will flatten the learning curve.

Specifically, regarding question number 5.
I wonder if this may be some related to the broker I am using?
I will try again with another broker and I will get back to you on this...

P.S.: Regardless of the young state of this lib, I think is a cool addition to the community, so thanks for your work!

from aiomqtt.

frederikaalund avatar frederikaalund commented on June 3, 2024 1

Hi, I am writing some code to contribute to the lib and as such I am also diving into the details of it and I have a question.

Great to hear that. 👍

Why do you declare the get here [1] and create a task to get the MQTT message using message.get(), instead of just using the await asyncio.wait(...) with the message.get() given that message is an asyncio.Queue?:

try: 
    done, _ = await asyncio.wait(
            (message.get(), self._disconnected), return_when=asyncio.FIRST_COMPLETED)
except asyncio.CancelledError:
    ....

Good question. :) The very first version of asyncio-mqtt that I wrote simply used await message.get(). This approach caused the following issue: When the user disconnects, the message loops hang forever. This is unwanted. Therefore, we now wait until either:

  1. We get a message (message.get())
  2. The client disconnects (self._disconnected)

This way, the message loop ends when the client disconnects. No wanted hangs. :) Does it make sense?


I tested issue number 2 with another broker and it works without specifying the Protocol, so I guess it is a faulty implementation of the broker I used before...

Also I have tested issue number 5, I mentioned before, in my comments with another broker and it works, great!

Great to hear that it was a broker issue and not an issue with asyncio-mqtt. :) Thanks for the investigation. 👍

My question is why do we need to specify two times the topic we subscribe to? cant we somehow just specify it once ?

You can specify that you are interested in all messages like this:

async with client.unfiltered_messages() as all_messages:
        await client.subscribe("dummy/contactor")
        async for message in all_messages:
            print(message.topic)
            print(json.loads(message.payload))

I honestly don't know how useful this filtered/unfiltered messages mechanic is in practice for our users. I just copied it over from paho-mqtt (we basically get it for free).

from aiomqtt.

frederikaalund avatar frederikaalund commented on June 3, 2024

Hi André, thanks for raising this issue. Let me have a look. :)

I think you make some good points. 👍 asyncio-mqtt is still a young library and it definitely lacks on the documentation side of things. In other words, there is a great opportunity for an open source contribution here (hint hint). :) We're a small team and I welcome all contributions. You seem to have a lot to contribute, which is great! If you're up for the task, we'll upgrade you to "maintainer" in no time.

Anyways, let me go through your list.

  1. I agree. I'd gladly welcome a PR that adds proper documentation. Good find!
  2. Sounds like a bug. Personally, I always specify MQTTv5, so I didn't run into it myself. In any case, asyncio-mqtt should work out-of-the-box without any protocol specified.
  3. Good idea. This would make an easy first pull request. :)
  4. Actually, that's by design (see #2). Maybe you can think of a way that we can support both use cases at the same time.
  5. Sounds like a bug. Very strange that nobody else found this. Under the hood, I simply forward the filters to paho-mqtt that does the filtering for us. Maybe I mixed up the logic there. :/

Again thanks for raising this issue, Andŕe. You have some well-thought-out ideas/findings and I'd like to have them as part of asyncio-mqtt. I have no immediate plans to implement them myself due to personal time constraints. I welcome all pull requests and I'll gladly review and provide feedback. 👍

from aiomqtt.

tropxy avatar tropxy commented on June 3, 2024

Hi, I am writing some code to contribute to the lib and as such I am also diving into the details of it and I have a question.
Why do you declare the get here [1] and create a task to get the MQTT message using message.get(), instead of just using the await asyncio.wait(...) with the message.get() given that message is an asyncio.Queue?:

try: 
    done, _ = await asyncio.wait(
            (message.get(), self._disconnected), return_when=asyncio.FIRST_COMPLETED)
except asyncio.CancelledError:
    ....

[1] - https://github.com/sbtinstruments/asyncio-mqtt/blob/f4736adf0d3c5b87a39ea27afd025ed58c7bb54c/asyncio_mqtt/client.py#L310

from aiomqtt.

tropxy avatar tropxy commented on June 3, 2024

I tested issue number 2 with another broker and it works without specifying the Protocol, so I guess it is a faulty implementation of the broker I used before...

Also I have tested issue number 5, I mentioned before, in my comments with another broker and it works, great! My question is why do we need to specify two times the topic we subscribe to? cant we somehow just specify it once ?

from aiomqtt.

tropxy avatar tropxy commented on June 3, 2024

Good question. :) The very first version of asyncio-mqtt that I wrote simply used await message.get(). This approach caused the following issue: When the user disconnects, the message loops hang forever. This is unwanted. Therefore, we now wait until either:
We get a message (message.get())
The client disconnects (self._disconnected)
This way, the message loop ends when the client disconnects. No wanted hangs. :) Does it make sense?

OK, yeah, I see what you doing now, makes sense. Usually, I use something like this to create a list of tasks that we await for:

async def cancel_task(task):
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass


async def wait_untill_done(awaitables: List[Awaitable[Any]],
                             finished_when=asyncio.FIRST_COMPLETED):
    tasks = []

    for awaitable in awaitables:
        if not isinstance(awaitable, asyncio.Task):
            awaitable = asyncio.create_task(awaitable)
        tasks.append(awaitable)

    done, pending = \
        await asyncio.wait(tasks, return_when=finished_when)

    for task in pending:
        await cancel_task(task)

    errors = []
    for task in done:
        try:
            task.result()
        except Exception as e:
            errors.append(e)

    if len(errors) == 1:
        raise errors[0]

    if errors:
        raise Error(errors)
  
class Error(Exception):
    def __init__(self, errors: List[Exception]):
        self.errors = errors

You can specify that you are interested in all messages like this:
async with client.unfiltered_messages() as all_messages:
await client.subscribe("dummy/contactor")
async for message in all_messages:
print(message.topic)
print(json.loads(message.payload))
I honestly don't know how useful this filtered/unfiltered messages mechanic is in practice for our users. I just copied it over from paho-mqtt (we basically get it for free).

I see...What bugs me is that we can influence the filtering in two ways: either messing with the filtered_messages() or when we subscribe. Like according to the lib example we have this:

async with Client("test.mosquitto.org") as client:
    async with client.filtered_messages("floors/+/humidity") as messages:
        await client.subscribe("floors/#")
        async for message in messages:
            print(message.payload.decode())

The fact that we subscribe to the floors/# entire directory of topics, makes no difference because we filtered for the humidity ones. If we had subscribed outside the async context manager, then it would make more sense for me, because then I could call another async context manager to deal with the unfiltered_messages or call another filtered_messages with a different topic, but given the fact the client is gone when the context manager is finished, this seems weird to me. Am I making any sense?

Maybe we could move the subscribe to the aenter ? taking into account the topic in the filtered_messages? Not sure if it is possible tho

from aiomqtt.

frederikaalund avatar frederikaalund commented on June 3, 2024

In general, I find that it's very awkward to handle task wait/cancellation in asyncio. This is why I'm tinkering a bit with an anyio-based implementation. It reads more naturally to me.


Maybe we could move the subscribe to the aenter ? taking into account the topic in the filtered_messages? Not sure if it is possible tho

Indeed, it's the common use case to filter for the very same topic that you subscribe to. There are, however, use cases where you, e.g., subscribe to a general topic ("floors/#") and then have different message loops each with their own filter ("floors/+/humidity", "floors/+/temperature", etc.). I want our API to support that use case.

All that being said, I'm all ears for a wrapper around all this that simplifies the common use case (same topic for filter and subscription). Here is an example of a simple wrapper around asyncio-mqtt:

@asynccontextmanager
async def subscribe(host: str, topic: str, *args, **kwargs):
    async with Client(host, *args, **kwargs) as client:
        async with client.unfiltered_messages() as messages:
            await client.subscribe(topic)
            yield from messages

You use it as follows:

async with subscribe("test.mosquitto.org", "floors/#") as messages:
    async for message in messages:
        print(message.payload.decode())

That's just one example. Do you have any suggestions? :)

from aiomqtt.

empicano avatar empicano commented on June 3, 2024

Given that this issue was addressed with a pull request and we now touch on these points in the documentation, I hope that I can close this. If there's anything left unsolved or unclear, please reopen! 😊

from aiomqtt.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.