Comments (8)
Hi @frederikaalund thanks for your fast reply! Yes, I would be happy to help around where I am able to. I think we all have the same problem: Time xD I am also a maintainer of another library (https://github.com/mobilityhouse/ocpp/tree/master/ocpp) and with work and other side projects gets tricky. Nevertheless, I think I can easily do some changes that will flatten the learning curve.
Specifically, regarding question number 5.
I wonder if this may be some related to the broker I am using?
I will try again with another broker and I will get back to you on this...
P.S.: Regardless of the young state of this lib, I think is a cool addition to the community, so thanks for your work!
from aiomqtt.
Hi, I am writing some code to contribute to the lib and as such I am also diving into the details of it and I have a question.
Great to hear that. 👍
Why do you declare the get here [1] and create a task to get the MQTT message using
message.get()
, instead of just using theawait asyncio.wait(...)
with themessage.get()
given thatmessage
is an asyncio.Queue?:try: done, _ = await asyncio.wait( (message.get(), self._disconnected), return_when=asyncio.FIRST_COMPLETED) except asyncio.CancelledError: ....
Good question. :) The very first version of asyncio-mqtt that I wrote simply used await message.get()
. This approach caused the following issue: When the user disconnects, the message loops hang forever. This is unwanted. Therefore, we now wait until either:
- We get a message (
message.get()
) - The client disconnects (
self._disconnected
)
This way, the message loop ends when the client disconnects. No wanted hangs. :) Does it make sense?
I tested issue number 2 with another broker and it works without specifying the Protocol, so I guess it is a faulty implementation of the broker I used before...
Also I have tested issue number 5, I mentioned before, in my comments with another broker and it works, great!
Great to hear that it was a broker issue and not an issue with asyncio-mqtt. :) Thanks for the investigation. 👍
My question is why do we need to specify two times the topic we subscribe to? cant we somehow just specify it once ?
You can specify that you are interested in all messages like this:
async with client.unfiltered_messages() as all_messages:
await client.subscribe("dummy/contactor")
async for message in all_messages:
print(message.topic)
print(json.loads(message.payload))
I honestly don't know how useful this filtered/unfiltered messages mechanic is in practice for our users. I just copied it over from paho-mqtt (we basically get it for free).
from aiomqtt.
Hi André, thanks for raising this issue. Let me have a look. :)
I think you make some good points. 👍 asyncio-mqtt is still a young library and it definitely lacks on the documentation side of things. In other words, there is a great opportunity for an open source contribution here (hint hint). :) We're a small team and I welcome all contributions. You seem to have a lot to contribute, which is great! If you're up for the task, we'll upgrade you to "maintainer" in no time.
Anyways, let me go through your list.
- I agree. I'd gladly welcome a PR that adds proper documentation. Good find!
- Sounds like a bug. Personally, I always specify MQTTv5, so I didn't run into it myself. In any case, asyncio-mqtt should work out-of-the-box without any protocol specified.
- Good idea. This would make an easy first pull request. :)
- Actually, that's by design (see #2). Maybe you can think of a way that we can support both use cases at the same time.
- Sounds like a bug. Very strange that nobody else found this. Under the hood, I simply forward the filters to paho-mqtt that does the filtering for us. Maybe I mixed up the logic there. :/
Again thanks for raising this issue, Andŕe. You have some well-thought-out ideas/findings and I'd like to have them as part of asyncio-mqtt. I have no immediate plans to implement them myself due to personal time constraints. I welcome all pull requests and I'll gladly review and provide feedback. 👍
from aiomqtt.
Hi, I am writing some code to contribute to the lib and as such I am also diving into the details of it and I have a question.
Why do you declare the get here [1] and create a task to get the MQTT message using message.get()
, instead of just using the await asyncio.wait(...)
with the message.get()
given that message
is an asyncio.Queue?:
try:
done, _ = await asyncio.wait(
(message.get(), self._disconnected), return_when=asyncio.FIRST_COMPLETED)
except asyncio.CancelledError:
....
from aiomqtt.
I tested issue number 2 with another broker and it works without specifying the Protocol, so I guess it is a faulty implementation of the broker I used before...
Also I have tested issue number 5, I mentioned before, in my comments with another broker and it works, great! My question is why do we need to specify two times the topic we subscribe to? cant we somehow just specify it once ?
from aiomqtt.
Good question. :) The very first version of asyncio-mqtt that I wrote simply used await message.get(). This approach caused the following issue: When the user disconnects, the message loops hang forever. This is unwanted. Therefore, we now wait until either:
We get a message (message.get())
The client disconnects (self._disconnected)
This way, the message loop ends when the client disconnects. No wanted hangs. :) Does it make sense?
OK, yeah, I see what you doing now, makes sense. Usually, I use something like this to create a list of tasks that we await for:
async def cancel_task(task):
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def wait_untill_done(awaitables: List[Awaitable[Any]],
finished_when=asyncio.FIRST_COMPLETED):
tasks = []
for awaitable in awaitables:
if not isinstance(awaitable, asyncio.Task):
awaitable = asyncio.create_task(awaitable)
tasks.append(awaitable)
done, pending = \
await asyncio.wait(tasks, return_when=finished_when)
for task in pending:
await cancel_task(task)
errors = []
for task in done:
try:
task.result()
except Exception as e:
errors.append(e)
if len(errors) == 1:
raise errors[0]
if errors:
raise Error(errors)
class Error(Exception):
def __init__(self, errors: List[Exception]):
self.errors = errors
You can specify that you are interested in all messages like this:
async with client.unfiltered_messages() as all_messages:
await client.subscribe("dummy/contactor")
async for message in all_messages:
print(message.topic)
print(json.loads(message.payload))
I honestly don't know how useful this filtered/unfiltered messages mechanic is in practice for our users. I just copied it over from paho-mqtt (we basically get it for free).
I see...What bugs me is that we can influence the filtering in two ways: either messing with the filtered_messages() or when we subscribe. Like according to the lib example we have this:
async with Client("test.mosquitto.org") as client:
async with client.filtered_messages("floors/+/humidity") as messages:
await client.subscribe("floors/#")
async for message in messages:
print(message.payload.decode())
The fact that we subscribe to the floors/# entire directory of topics, makes no difference because we filtered for the humidity ones. If we had subscribed outside the async context manager, then it would make more sense for me, because then I could call another async context manager to deal with the unfiltered_messages or call another filtered_messages with a different topic, but given the fact the client is gone when the context manager is finished, this seems weird to me. Am I making any sense?
Maybe we could move the subscribe to the aenter ? taking into account the topic in the filtered_messages? Not sure if it is possible tho
from aiomqtt.
In general, I find that it's very awkward to handle task wait/cancellation in asyncio. This is why I'm tinkering a bit with an anyio-based implementation. It reads more naturally to me.
Maybe we could move the subscribe to the aenter ? taking into account the topic in the filtered_messages? Not sure if it is possible tho
Indeed, it's the common use case to filter for the very same topic that you subscribe to. There are, however, use cases where you, e.g., subscribe to a general topic ("floors/#") and then have different message loops each with their own filter ("floors/+/humidity", "floors/+/temperature", etc.). I want our API to support that use case.
All that being said, I'm all ears for a wrapper around all this that simplifies the common use case (same topic for filter and subscription). Here is an example of a simple wrapper around asyncio-mqtt:
@asynccontextmanager
async def subscribe(host: str, topic: str, *args, **kwargs):
async with Client(host, *args, **kwargs) as client:
async with client.unfiltered_messages() as messages:
await client.subscribe(topic)
yield from messages
You use it as follows:
async with subscribe("test.mosquitto.org", "floors/#") as messages:
async for message in messages:
print(message.payload.decode())
That's just one example. Do you have any suggestions? :)
from aiomqtt.
Given that this issue was addressed with a pull request and we now touch on these points in the documentation, I hope that I can close this. If there's anything left unsolved or unclear, please reopen! 😊
from aiomqtt.
Related Issues (20)
- Impossible to reuse client in case of connection timeout HOT 7
- Typing warnings in IDE on Client methods HOT 2
- AttributeError: module 'aiomqtt' has no attribute '__version__'
- session persistence with protocol version 5 HOT 1
- Pin Paho to <2.0 HOT 2
- Make aiomqtt compatible with paho v2 HOT 10
- Topic.matches should be used when doing == HOT 4
- How to get the IP of a failed connection attempt
- strange reconnection HOT 2
- MQTT broker reports a disconnection/reconnection... but no aiomqtt.MqttError is raised HOT 4
- Exceptions on __aenter__ not handled. HOT 2
- Reconnect Bug HOT 1
- Cannot instantiate a client due to internal mqtt problem HOT 2
- Issues with uvicorn on Windows 10 HOT 5
- [BUG] Cannot install version 2.0.0 with paho-mqtt 2.0.0 HOT 2
- [BUG] 127.0.0.1/localhost Work Incorrectly HOT 3
- Do not send messages for a long time mqtt automatically disconnects HOT 1
- Can't read incoming messages in pytests HOT 8
- No convenient way to get message without getting locked into a for loop HOT 2
- Can aiomqtt queue has a ring buffer option, for high frequense in-comming messages? HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from aiomqtt.