Code Monkey home page Code Monkey logo

moat-mqtt's Introduction

MoaT-MQTT ======

MoaT-MQTT is an open source MQTT client and broker implementation. It is a fork of hbmqtt with support for anyio and MoaT-KV.

MoaT-MQTT provides a straightforward API based on coroutines, making it easy to write highly concurrent applications.

Features

MoaT-MQTT implements the full set of MQTT 3.1.1 protocol specifications and provides the following features:

  • Support for QoS 0, QoS 1 and QoS 2 messages flows
  • Client auto-reconnection
  • Authentication via password file (more methods can be added through a plugin system)
  • Basic $SYS topics
  • TCP and websocket support
  • SSL support over TCP and websocket
  • Plugin system
  • Optional: Storage of retained messages in MoaT-KV

Build status

TODO.

Project status

TODO.

Getting started

MoaT-MQTT is available on Pypi and can installed simply using pip : :

$ pip install moat-mqtt

Documentation is available on Read the Docs.

Bug reports, patches and suggestions welcome! Just open an issue.

'Join the chat at https://gitter.im/beerfactory/moat-mqtt'

Moat-MQTT? DistMQTT? MoaT-KV? Whatever? --------------------------------------

MoaT-MQTT is a Python package that includes a stand-alone MQTT server, as well as basic client scripts. It is based on hbmqtt and was previously named "DistMQTT".

MoaT-KV is a distributed key-value storage system. It uses the MoaT-MQTT client library as its connector to an MQTT server, preferably Mosquitto or some other low-latency broker. It was previously named "DistKV".

A MoaT-MQTT server can hook into MoaT-KV so that some messages are persisted, translated (i.e. store msgpack messages encoding True and False, instead of the strings "on" and "off" (or "ON" and "OFF" or "1" and "0" or โ€ฆ)), filtered (e.g. the client can only modify existing messages but not add any), et al..

moat-mqtt's People

Contributors

akatrevorjay avatar andvikt avatar bachp avatar burnpanck avatar cfchou avatar clach04 avatar claws avatar d21d3q avatar dansheme avatar erics465 avatar florianludwig avatar gdraynz avatar gitter-badger avatar hexdump42 avatar hongquan avatar jcea avatar jodal avatar luchermans avatar mi3z avatar mikenerone avatar mitchbte avatar njouanin avatar romancardenas avatar smurfix avatar spacetag avatar taliesin avatar thiswiederkehr avatar vlcinsky avatar zhukovalexander avatar zyp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

moat-mqtt's Issues

Please update anyio dependency to 4.x

On moat 0.39.8 (the current latest available on pip, despite the 0.40.0 release already available on GitHub), I have ran into errors like:

../../../.virtualenvs/throwaway/lib/python3.11/site-packages/anyio/_backends/_trio.py:164: in <module>
    class ExceptionGroup(BaseExceptionGroup, trio.MultiError):
../../../.virtualenvs/throwaway/lib/python3.11/site-packages/trio/_deprecate.py:153: in __getattr__
    raise AttributeError(msg.format(self.__name__, name))
E   AttributeError: module 'trio' has no attribute 'MultiError'

which have gone away just by doing pip install -U anyio. Please update the version compatibility for moat-mqtt.

Not able to work with moat-mqtt

Hello,

I have always worked with Distmqtt with no issue.
Today I tried to install moat-mqtt for a new project and I am not able to work with it.

I have a new python 3.8 project, and i use poetry.
I just installed moat-mqtt (3.6.1 is the only one available on PyPI)

poetry new asd
cd asd
poetry add moat-mqtt

image

and the installation is correct
image

and I definitely see distmqtt in the venv directory
image

However, there is no way for me to import or use moat-mqtt.

from moat.mqtt.mqtt.constants import QOS_1
from moat.mqtt.mqtt.constants import QOS_2

print(QOS_1, QOS_2)

Running this (poetry run python asd/__init__.py) gives me:

Traceback (most recent call last):
  File "asd/__init__.py", line 1, in <module>
    from moat.mqtt.mqtt.constants import QOS_1
ModuleNotFoundError: No module named 'moat'

Am I missing something obvious or is the new version with the new naming not supposed to be used yet?

Thank you!

AttributeError: 'TLSStream' object has no attribute 'close'

Stack trace says it all. Note that this is during a (trio) cancellation of the scope in which the client is running - i.e. I am intentionally shutting down this client, but not the whole app (oddly, I don't see this error when shutting down the whole app).

Traceback (most recent call last):
  File "/Users/mikenerone/dev/work/app/src/mymodule/transport.py", line 174, in _receive_packages
    del package
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/trio/_abc.py", line 261, in __aexit__
    await self.aclose()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/trio/_channel.py", line 386, in aclose
    await trio.lowlevel.checkpoint()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/trio/_core/_run.py", line 2325, in checkpoint
    await _core.wait_task_rescheduled(lambda _: _core.Abort.SUCCEEDED)
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
    return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/outcome/_impl.py", line 138, in unwrap
    raise captured_error
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/trio/_core/_run.py", line 1172, in raise_cancel
    raise Cancelled._create()
trio.Cancelled: Cancelled

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 129, in close
  await self._stream.close()
AttributeError: 'TLSStream' object has no attribute 'close'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/anyio/streams/tls.py", line 99, in _call_sslobject_method
  result = func(*args)
File "/Users/mikenerone/.pyenv/versions/3.8.6/Python.framework/Versions/3.8/lib/python3.8/ssl.py", line 948, in unwrap
  return self._sslobj.shutdown()
ssl.SSLSyscallError: Some I/O error occurred (_ssl.c:2746)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/mikenerone/dev/work/app/src/mymodule/service.py", line 47, in run_service
  nursery.start_soon(partial(self.service, *args, **kwargs))
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/trio/_core/_run.py", line 815, in __aexit__
  raise combined_error_from_nursery
File "/Users/mikenerone/dev/work/app/src/mymodule/transport.py", line 148, in service
  nursery.start_soon(self._receive_packages)
File "/Users/mikenerone/dev/work/app/src/mymodule/opendxl.py", line 105, in __aexit__
  await self._client.__aexit__(exc_type, exc_val, exc_tb)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/asyncdxlclient/client.py", line 111, in __aexit__
  await self.disconnect()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/asyncdxlclient/client.py", line 139, in disconnect
  await self._mqtt.disconnect()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
  await self._handler.stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
  await super().stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 180, in stop
  await self.stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 131, in close
  await self._stream.aclose()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/anyio/streams/tls.py", line 141, in aclose
  await self.unwrap()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/anyio/streams/tls.py", line 133, in unwrap
  await self._call_sslobject_method(self._ssl_object.unwrap)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/app-7FgPpmjV-py3.8/lib/python3.8/site-packages/anyio/streams/tls.py", line 118, in _call_sslobject_method
  raise BrokenResourceError from exc
anyio.BrokenResourceError

That self._stream is an anyio.streams.tls.TLSStream object. Since anyio 2.0.0, close() is renamed to aclose(). It seems likely that other updates are needed, as well (bunch of renames/deprecations at https://anyio.readthedocs.io/en/latest/versionhistory.html), but I'm just reporting the specific issue I hit.

most probably not an issue, just don't know how to deal with network breakdown

I have a simple client subscribing a topic (to turn on some lamps).

async def mqtt_lamp_worker(loop):
	  "coroutine to handle incoming mqtt messages concerning lamps"
	  async with open_mqttclient(uri=CONFIG.mqtt.host) as client:
		  await client.subscribe([(CONFIG.mqtt.lamp_topic, QOS_1)])

		  while loop.is_running():
			  message = await client.deliver_message()
			  packet = message.publish_packet
			  pattern = get_pattern_value(packet.payload.data.decode())
			  if pattern is not None:
				  await set_lamp(client, pattern, False)
			  else:
				  print("ignoring unknown pattern")

If the network connection goes down completely, I get:

ConnectionResetError --> BrokenResourceError --> Cancelscope...timeout never awaited

How do I deal with this? I'd like to simply reconnect, if the network shows up again and wait in the meantime.

Broker exits when subscriber disconnected

When a subscriber disconnects from the broker, the following error appears and the broker exits completely. This is distmqtt==0.35.2, anyio==3.6.1 running on Python 3.8 on Windows. I'm not sure which are expected exceptions and which are actual errors. Maybe it's that one right at the end: anyio.DeprecatedAwaitable seems to be an internal API, maybe it was exposed on an older version of anyio by accident and not recent versions?

Traceback (most recent call last):
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\windows_events.py", line 457, in finish_recv
    return ov.getresult()
OSError: [WinError 64] The specified network name is no longer available

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\proactor_events.py", line 280, in _loop_reading
    data = fut.result()
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\windows_events.py", line 812, in _poll
    value = callback(transferred, key, ov)
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\windows_events.py", line 461, in finish_recv
    raise ConnectionResetError(*exc.args)
ConnectionResetError: [WinError 64] The specified network name is no longer available

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 467, in main_scope
    yield s
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\broker.py", line 178, in create_broker
    tg.cancel_scope.cancel()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\mqtt\protocol\handler.py", line 472, in _reader_loop
    fixed_header = await MQTTFixedHeader.from_stream(self.stream)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\mqtt\packet.py", line 110, in from_stream
    int1 = (await read_or_raise(reader, 1))[0]
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\codecs.py", line 59, in read_or_raise
    data = await reader.read(n)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\adapters.py", line 128, in read
    data = await self._rstream.receive_exactly(n)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\streams\buffered.py", line 72, in receive_exactly
    chunk = await self.receive_stream.receive(remaining)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_backends\_asyncio.py", line 1274, in receive
    raise self._protocol.exception
anyio.BrokenResourceError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 207, in _ctx
    yield self
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 437, in _ctx
    yield s
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 470, in main_scope
    s.cancel()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 346, in cancel
    return anyio.DeprecatedAwaitable(self.cancel)
AttributeError: module 'anyio' has no attribute 'DeprecatedAwaitable'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\arthur\.conda\envs\py38\lib\runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\arthur\.conda\envs\py38\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "C:\Users\arthur\.conda\envs\py38\Scripts\distmqtt.exe\__main__.py", line 7, in <module>
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 1157, in __call__
    return anyio.run(self._main, main, args, kwargs, **opts)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_core\_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_backends\_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\base_events.py", line 616, in run_until_complete
    return future.result()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_backends\_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 1160, in _main
    return await main(*args, **kwargs)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 1076, in main
    rv = await self.invoke(ctx)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 1434, in invoke
    return await ctx.invoke(self.callback, **ctx.params)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 780, in invoke
    rv = await rv
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\scripts\broker_script.py", line 66, in main
    await anyio.sleep(99999)
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 679, in __aexit__
    raise exc_details[1]
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 470, in main_scope
    s.cancel()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 452, in __aexit__
    return await self._ctx_.__aexit__(*tb)  # pylint:disable=no-member  # YES IT HAS
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 439, in _ctx
    del self._scopes[s._name]
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 211, in _ctx
    await self.cancel_immediate()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 336, in cancel_immediate
    self.cancel()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 346, in cancel
    return anyio.DeprecatedAwaitable(self.cancel)
AttributeError: module 'anyio' has no attribute 'DeprecatedAwaitable'

New PyPI release?

v0.35.0 was tagged quite a while ago but there is no PyPI release yet. Would you mind fixing that? Thanks!

build of distmqtt without distkv?

Hi,
Did you mention in python-trio general about making a build of distmqtt that isn't built with distkv? Is that still in the works?

Thanks
Charles

Describe relationship to DistKV

Could you please describe the relationship between DistMQTT and DistKV?

  • Is DistKV required to use DistMQTT?
  • What features does DistKV add to this project or MQTT in general?
  • Why did you make the decision to couple a DistKV client and MQTT client together?

I'm asking because while I generally like the idea of anyio hbmqtt library, I don't think I need DistKV for my project.

auto-reconnection doesn't work

If the connection is lost to the broker, moat-mqtt doesn't retry and crashes. The error message is the same with or without "auto_reconnect" being set.

sample:
import logging
import anyio

from moat.mqtt.client import open_mqttclient, ClientException

config = {
"keep_alive": 10,
"ping_delay": 1,
"default_qos": 0,
"default_retain": False,
"auto_reconnect": True,
"reconnect_max_interval": 10,
"reconnect_retries": 3,
"codec": "noop",
}

logger = logging.getLogger(name)

async def uptime_coro():
async with open_mqttclient(config=config) as C:
try:
await C.connect("mqtt://k")
# will wait until the broker disappears
await anyio.sleep_forever()
except ClientException as ce:
logger.error("Client exception: %r", ce)

if name == "main":
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
anyio.run(uptime_coro)

When the broker disappears, the connection times out. The message is the same with auto-reconnect true or false.

error message:
(.env) โžœ mqtt /Users/fredrick867/projects/viewer/.env/bin/python /Users/fredrick867/projects/viewer/testclient.py
[2022-12-19 20:54:11,495] {core.py:136} INFO - Finished processing state new exit callbacks.
[2022-12-19 20:54:11,495] {core.py:130} INFO - Finished processing state connected enter callbacks.
[2022-12-19 21:09:47,326] {handler.py:525} WARNING - ClientProtocolHandler Unhandled exception in reader coro
Traceback (most recent call last):
File "/usr/local/Cellar/[email protected]/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/selector_events.py", line 862, in _read_ready__data_received
data = self._sock.recv(self.max_size)
TimeoutError: [Errno 60] Operation timed out

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/mqtt/protocol/handler.py", line 469, in _reader_loop
fixed_header = await MQTTFixedHeader.from_stream(self.stream)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/mqtt/packet.py", line 105, in from_stream
int1 = (await read_or_raise(reader, 1))[0]
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/codecs.py", line 61, in read_or_raise
data = await reader.read(n)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/adapters.py", line 129, in read
data = await self._rstream.receive_exactly(n)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/streams/buffered.py", line 72, in receive_exactly
chunk = await self.receive_stream.receive(remaining)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1274, in receive
raise self._protocol.exception
anyio.BrokenResourceError
[2022-12-19 21:09:47,337] {core.py:136} INFO - Finished processing state connected exit callbacks.
[2022-12-19 21:09:47,337] {core.py:130} INFO - Finished processing state disconnected enter callbacks.
Traceback (most recent call last):
File "/usr/local/Cellar/[email protected]/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/selector_events.py", line 862, in _read_ready__data_received
data = self._sock.recv(self.max_size)
TimeoutError: [Errno 60] Operation timed out

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/fredrick867/projects/viewer/testclient.py", line 38, in
anyio.run(uptime_coro)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/Cellar/[email protected]/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/Cellar/[email protected]/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/Users/fredrick867/projects/viewer/testclient.py", line 24, in uptime_coro
async with open_mqttclient(config=config) as C:
File "/usr/local/Cellar/[email protected]/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/contextlib.py", line 217, in aexit
await self.gen.athrow(typ, value, traceback)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/client.py", line 158, in open_mqttclient
async with anyio.create_task_group() as tg:
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 662, in aexit
raise exceptions[0]
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/mqtt/protocol/handler.py", line 469, in _reader_loop
fixed_header = await MQTTFixedHeader.from_stream(self.stream)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/mqtt/packet.py", line 105, in from_stream
int1 = (await read_or_raise(reader, 1))[0]
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/codecs.py", line 61, in read_or_raise
data = await reader.read(n)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/adapters.py", line 129, in read
data = await self._rstream.receive_exactly(n)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/streams/buffered.py", line 72, in receive_exactly
chunk = await self.receive_stream.receive(remaining)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1274, in receive
raise self._protocol.exception
anyio.BrokenResourceError

MQTTClient doesn't close MQTTS (TLS) connections cleanly

Reproduction script (obviously, I had a broker set up listening on both of the URLs in this):

#! /usr/bin/env python3

import logging
import sys

import anyio
from distmqtt.client import MQTTClient

secure = 'secure' in sys.argv
backend = 'trio' if 'trio' in sys.argv else 'asyncio'

if secure:
    uri = 'mqtts://127.0.0.1:2883'
    config = dict(
        check_hostname=False,
        broker=dict(
            # Side note: Had to make a CA because there's no way to turn off verification
            cafile='ca-bundle.crt',
        )
    )
else:
    uri = 'mqtt://127.0.0.1:1883'
    config = {}

logging.basicConfig(level='DEBUG')
logging.getLogger().setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)


async def main():
    async with anyio.create_task_group() as tg:
        client = MQTTClient(tg, config=config)
        await client.connect(uri)
        logger.warning('Connected')
        await client.disconnect()
        logger.warning('Disconnected')

anyio.run(main, backend=backend)

Running this without "secure" on the command line works fine, regardless of backend

DEBUG:asyncio:Using selector: KqueueSelector
DEBUG:distmqtt.client:Using generated client ID : distmqtt-kvgxupwiq{fvwict
DEBUG:distmqtt.client.plugins:Loading plugins for namespace distmqtt.client.plugins
DEBUG:distmqtt.client:Connect to: mqtt://127.0.0.1:1883
DEBUG:distmqtt.mqtt.protocol.handler:< C ConnackPacket(ts=2020-08-07 12:15:24.627961, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=ConnackVariableHeader(session_parent=0x0, return_code=0x0), payload=None)
DEBUG:distmqtt.mqtt.protocol.handler:distmqtt-kvgxupwiq{fvwict Starting reader coro
DEBUG:distmqtt.mqtt.protocol.handler:Handler tasks started
DEBUG:distmqtt.mqtt.protocol.handler:Begin messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:0 messages redelivered
DEBUG:distmqtt.mqtt.protocol.handler:0 messages not redelivered due to timeout
DEBUG:distmqtt.mqtt.protocol.handler:End messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict ready
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state new to state connected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state new. Processing callbacks...
INFO:transitions.core:Exited state new
DEBUG:transitions.core:Entering state connected. Processing callbacks...
INFO:transitions.core:Entered state connected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
DEBUG:distmqtt.client:connected to 127.0.0.1:1883
DEBUG:distmqtt.client:Wait for broker disconnection
WARNING:__main__:Connected
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for reader <anyio._backends._asyncio.TaskGroup object at 0x102e958c0> to be stopped
WARNING:distmqtt.client:Disconnected from broker
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for sender <anyio._backends._asyncio.CancelScope object at 0x102e5f7b0> to be stopped
WARNING:distmqtt.mqtt.protocol.handler:ClientProtocolHandler CANCEL
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict coro stopped
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closed writer
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state connected to state disconnected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state connected. Processing callbacks...
INFO:transitions.core:Exited state connected
DEBUG:transitions.core:Entering state disconnected. Processing callbacks...
INFO:transitions.core:Entered state disconnected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
WARNING:__main__:Disconnected
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closed writer
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state disconnected to state disconnected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state disconnected. Processing callbacks...
INFO:transitions.core:Exited state disconnected
DEBUG:transitions.core:Entering state disconnected. Processing callbacks...
INFO:transitions.core:Entered state disconnected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks

But running it with secure on the command line (enabling MQTTS and using asyncio) crashes the app with a stack trace:

DEBUG:asyncio:Using selector: KqueueSelector
DEBUG:distmqtt.client:Using generated client ID : distmqtt-elgwutjmmdeiv{fq
DEBUG:distmqtt.client.plugins:Loading plugins for namespace distmqtt.client.plugins
DEBUG:distmqtt.client:Connect to: mqtts://127.0.0.1:2883
DEBUG:distmqtt.mqtt.protocol.handler:< C ConnackPacket(ts=2020-08-07 12:17:18.235579, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=ConnackVariableHeader(session_parent=0x0, return_code=0x0), payload=None)
DEBUG:distmqtt.mqtt.protocol.handler:distmqtt-elgwutjmmdeiv{fq Starting reader coro
DEBUG:distmqtt.mqtt.protocol.handler:Handler tasks started
DEBUG:distmqtt.mqtt.protocol.handler:Begin messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:0 messages redelivered
DEBUG:distmqtt.mqtt.protocol.handler:0 messages not redelivered due to timeout
DEBUG:distmqtt.mqtt.protocol.handler:End messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq ready
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state new to state connected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state new. Processing callbacks...
INFO:transitions.core:Exited state new
DEBUG:transitions.core:Entering state connected. Processing callbacks...
INFO:transitions.core:Entered state connected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
DEBUG:distmqtt.client:connected to 127.0.0.1:2883
DEBUG:distmqtt.client:Wait for broker disconnection
WARNING:__main__:Connected
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for reader <anyio._backends._asyncio.TaskGroup object at 0x10ffeb800> to be stopped
WARNING:distmqtt.client:Disconnected from broker
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for sender <anyio._backends._asyncio.CancelScope object at 0x10ffcac10> to be stopped
WARNING:distmqtt.mqtt.protocol.handler:ClientProtocolHandler CANCEL
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq coro stopped
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
Traceback (most recent call last):
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 185, in unwrap_tls
    self._raw_socket = cast(ssl.SSLSocket, self._raw_socket).unwrap()
  File "/Users/mikenerone/.pyenv/versions/3.8.5/lib/python3.8/ssl.py", line 1285, in unwrap
    s = self._sslobj.shutdown()
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2745)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./closetest.py", line 34, in main
    await client.disconnect()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
    await self._handler.stop()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 633, in wait_socket_readable
    raise ClosedResourceError
anyio.exceptions.ClosedResourceError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./closetest.py", line 37, in <module>
    anyio.run(main)
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/__init__.py", line 72, in run
    return asynclib.run(func, *args, **backend_options)  # type: ignore
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 114, in run
    raise exception
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 76, in wrapper
    retval = await func(*args)
  File "./closetest.py", line 35, in main
    logger.warning('Disconnected')
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 390, in __aexit__
    raise ExceptionGroup(exceptions)
anyio._backends._asyncio.ExceptionGroup: 2 exceptions were raised in the task group:
----------------------------
Traceback (most recent call last):

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 415, in _run_wrapped_task
    await func(*args)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 724, in handle_connection_close
    await self._handler.stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 618, in wait_socket_readable
    raise ResourceBusyError('reading from') from None

anyio.exceptions.ResourceBusyError: Another task is already reading from this resource
----------------------------
Traceback (most recent call last):

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 185, in unwrap_tls
    self._raw_socket = cast(ssl.SSLSocket, self._raw_socket).unwrap()

  File "/Users/mikenerone/.pyenv/versions/3.8.5/lib/python3.8/ssl.py", line 1285, in unwrap
    s = self._sslobj.shutdown()

ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2745)


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "./closetest.py", line 34, in main
    await client.disconnect()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
    await self._handler.stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 633, in wait_socket_readable
    raise ClosedResourceError

anyio.exceptions.ClosedResourceError

Running it with "secure" and "trio" on the command line (enabling MQTSS and trio) also crashes with a stack trace, but it's different:

DEBUG:distmqtt.client:Using generated client ID : distmqtt-ai{epvdnqr{uqppo
DEBUG:distmqtt.client.plugins:Loading plugins for namespace distmqtt.client.plugins
DEBUG:distmqtt.client:Connect to: mqtts://127.0.0.1:2883
DEBUG:distmqtt.mqtt.protocol.handler:< C ConnackPacket(ts=2020-08-07 12:25:17.745781, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=ConnackVariableHeader(session_parent=0x0, return_code=0x0), payload=None)
DEBUG:distmqtt.mqtt.protocol.handler:distmqtt-ai{epvdnqr{uqppo Starting reader coro
DEBUG:distmqtt.mqtt.protocol.handler:Handler tasks started
DEBUG:distmqtt.mqtt.protocol.handler:Begin messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:0 messages redelivered
DEBUG:distmqtt.mqtt.protocol.handler:0 messages not redelivered due to timeout
DEBUG:distmqtt.mqtt.protocol.handler:End messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo ready
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state new to state connected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state new. Processing callbacks...
INFO:transitions.core:Exited state new
DEBUG:transitions.core:Entering state connected. Processing callbacks...
INFO:transitions.core:Entered state connected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
DEBUG:distmqtt.client:connected to 127.0.0.1:2883
DEBUG:distmqtt.client:Wait for broker disconnection
WARNING:__main__:Connected
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for reader <anyio._backends._trio.TaskGroup object at 0x106fc3240> to be stopped
WARNING:distmqtt.client:Disconnected from broker
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for sender <anyio._backends._trio.CancelScope object at 0x106f91c40> to be stopped
WARNING:distmqtt.mqtt.protocol.handler:ClientProtocolHandler CANCEL
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo coro stopped
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
Traceback (most recent call last):
  File "./closetest.py", line 38, in <module>
    anyio.run(main, backend=backend)
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/__init__.py", line 72, in run
    return asynclib.run(func, *args, **backend_options)  # type: ignore
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_run.py", line 1896, in run
    raise runner.main_task_outcome.error
  File "./closetest.py", line 36, in main
    logger.warning('Disconnected')
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 139, in __aexit__
    raise ExceptionGroup(exc.exceptions) from None
anyio._backends._trio.ExceptionGroup: 2 exceptions were raised in the task group:
----------------------------
Traceback (most recent call last):

  File "./closetest.py", line 35, in main
    await client.disconnect()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
    await self._handler.stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 213, in wait_socket_readable
    raise ResourceBusyError('reading from') from None

anyio.exceptions.ResourceBusyError: Another task is already reading from this resource
----------------------------
Traceback (most recent call last):

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 724, in handle_connection_close
    await self._handler.stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 211, in wait_socket_readable
    raise ClosedResourceError().with_traceback(exc.__traceback__) from None

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 209, in wait_socket_readable
    await wait_readable(sock)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_generated_io_kqueue.py", line 37, in wait_readable
    return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_readable(fd)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 162, in wait_readable
    await self._wait_common(fd, select.KQ_FILTER_READ)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 158, in _wait_common
    await self.wait_kevent(fd, filter, abort)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 127, in wait_kevent
    return await _core.wait_task_rescheduled(abort)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
    return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/outcome/_sync.py", line 111, in unwrap
    raise captured_error

anyio.exceptions.ClosedResourceError


Details of embedded exception 1:

  Traceback (most recent call last):
    File "./closetest.py", line 35, in main
      await client.disconnect()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
      await self._handler.stop()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
      await super().stop()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
      await self.stream.close()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
      await self._stream.close()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
      await self._socket.unwrap_tls()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
      await self._wait_readable()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 213, in wait_socket_readable
      raise ResourceBusyError('reading from') from None
  anyio.exceptions.ResourceBusyError: Another task is already reading from this resource

Details of embedded exception 2:

  Traceback (most recent call last):
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 724, in handle_connection_close
      await self._handler.stop()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
      await super().stop()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
      await self.stream.close()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
      await self._stream.close()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
      await self._socket.unwrap_tls()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
      await self._wait_readable()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 211, in wait_socket_readable
      raise ClosedResourceError().with_traceback(exc.__traceback__) from None
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 209, in wait_socket_readable
      await wait_readable(sock)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_generated_io_kqueue.py", line 37, in wait_readable
      return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_readable(fd)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 162, in wait_readable
      await self._wait_common(fd, select.KQ_FILTER_READ)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 158, in _wait_common
      await self.wait_kevent(fd, filter, abort)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 127, in wait_kevent
      return await _core.wait_task_rescheduled(abort)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
      return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/outcome/_sync.py", line 111, in unwrap
      raise captured_error
  anyio.exceptions.ClosedResourceError

Unable to establish MQTTS connections due to SSL error "unknown alert type"

Any attempt to connect to a broker with an mqtts:// URI fails with a stack trace similar to:

Traceback (most recent call last):
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 659, in _connect_coro
    await conn.start_tls()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 323, in start_tls
    await self._socket.start_tls(ssl_context, self._server_hostname,
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 171, in start_tls
    self._raw_socket.do_handshake()
  File "/Users/mikenerone/.pyenv/versions/3.8.5/lib/python3.8/ssl.py", line 1309, in do_handshake
    self._sslobj.do_handshake()
ssl.SSLError: [SSL: UNKNOWN_ALERT_TYPE] unknown alert type (_ssl.c:1123)

The cause is that SSL negotiation is initiated twice in MQTTClient._connect_coro, once during anyio.connect_tcp() (L655) because autostart_tls=True is in **kwargs, and then again explicitly via conn.start_tls() at L659. I'll have a fix PR shortly (or perhaps tomorrow if I get too sleepy :D).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.