Code Monkey home page Code Monkey logo

aiormq's Introduction

AIORMQ

Coveralls Status Build status Latest Version

aiormq is a pure python AMQP client library.

  • 3.x.x branch - Production/Stable
  • 4.x.x branch - Unstable (Experimental)
  • 5.x.x and greater is only Production/Stable releases.
  • Connecting by URL
  • Buffered queue for received frames

  • Only PLAIN auth mechanism support

  • Publisher confirms support

  • Transactions support

  • Channel based asynchronous locks

    Note

    AMQP 0.9.1 requires serialize sending for some frame types on the channel. e.g. Content body must be following after content header. But frames might be sent asynchronously on another channels.

  • Tracking unroutable messages (Use connection.channel(on_return_raises=False) for disabling)

  • Full SSL/TLS support, using your choice of:
    • amqps:// url query parameters:
      • cafile= - string contains path to ca certificate file
      • capath= - string contains path to ca certificates
      • cadata= - base64 encoded ca certificate data
      • keyfile= - string contains path to key file
      • certfile= - string contains path to certificate file
      • no_verify_ssl - boolean disables certificates validation
    • context= SSLContext keyword argument to connect().
  • Python type hints

  • Uses pamqp as an AMQP 0.9.1 frame encoder/decoder

import asyncio
import aiormq

async def on_message(message):
    """
    on_message doesn't necessarily have to be defined as async.
    Here it is to show that it's possible.
    """
    print(f" [x] Received message {message!r}")
    print(f"Message body is: {message.body!r}")
    print("Before sleep!")
    await asyncio.sleep(5)   # Represents async I/O operations
    print("After sleep!")


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    # Declaring queue
    declare_ok = await channel.queue_declare('helo')
    consume_ok = await channel.basic_consume(
        declare_ok.queue, on_message, no_ack=True
    )


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()
import asyncio
from typing import Optional

import aiormq
from aiormq.abc import DeliveredMessage


MESSAGE: Optional[DeliveredMessage] = None


async def main():
    global MESSAGE

    body = b'Hello World!'

    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost//")

    # Creating a channel
    channel = await connection.channel()

    declare_ok = await channel.queue_declare("hello", auto_delete=True)

    # Sending the message
    await channel.basic_publish(body, routing_key='hello')
    print(f" [x] Sent {body}")

    MESSAGE = await channel.basic_get(declare_ok.queue)
    print(f" [x] Received message from {declare_ok.queue!r}")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

assert MESSAGE is not None
assert MESSAGE.routing_key == "hello"
assert MESSAGE.body == b'Hello World!'
import sys
import asyncio
import aiormq


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    body = b' '.join(sys.argv[1:]) or b"Hello World!"

    # Sending the message
    await channel.basic_publish(
        body,
        routing_key='task_queue',
        properties=aiormq.spec.Basic.Properties(
            delivery_mode=1,
        )
    )

    print(f" [x] Sent {body!r}")

    await connection.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import asyncio
import aiormq
import aiormq.abc


async def on_message(message: aiormq.abc.DeliveredMessage):
    print(f" [x] Received message {message!r}")
    print(f"     Message body is: {message.body!r}")


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")


    # Creating a channel
    channel = await connection.channel()
    await channel.basic_qos(prefetch_count=1)

    # Declaring queue
    declare_ok = await channel.queue_declare('task_queue', durable=True)

    # Start listening the queue with name 'task_queue'
    await channel.basic_consume(declare_ok.queue, on_message, no_ack=True)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# we enter a never-ending loop that waits for data and runs
# callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
import sys
import asyncio
import aiormq


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    await channel.exchange_declare(
        exchange='logs', exchange_type='fanout'
    )

    body = b' '.join(sys.argv[1:]) or b"Hello World!"

    # Sending the message
    await channel.basic_publish(
        body, routing_key='info', exchange='logs'
    )

    print(f" [x] Sent {body!r}")

    await connection.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import asyncio
import aiormq
import aiormq.abc


async def on_message(message: aiormq.abc.DeliveredMessage):
    print(f"[x] {message.body!r}")

    await message.channel.basic_ack(
        message.delivery.delivery_tag
    )


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()
    await channel.basic_qos(prefetch_count=1)

    await channel.exchange_declare(
        exchange='logs', exchange_type='fanout'
    )

    # Declaring queue
    declare_ok = await channel.queue_declare(exclusive=True)

    # Binding the queue to the exchange
    await channel.queue_bind(declare_ok.queue, 'logs')

    # Start listening the queue with name 'task_queue'
    await channel.basic_consume(declare_ok.queue, on_message)


loop = asyncio.get_event_loop()
loop.create_task(main())

# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(' [*] Waiting for logs. To exit press CTRL+C')
loop.run_forever()
import sys
import asyncio
import aiormq
import aiormq.abc


async def on_message(message: aiormq.abc.DeliveredMessage):
    print(f" [x] {message.delivery.routing_key!r}:{message.body!r}"
    await message.channel.basic_ack(
        message.delivery.delivery_tag
    )


async def main():
    # Perform connection
    connection = aiormq.Connection("amqp://guest:guest@localhost/")
    await connection.connect()

    # Creating a channel
    channel = await connection.channel()
    await channel.basic_qos(prefetch_count=1)

    severities = sys.argv[1:]

    if not severities:
        sys.stderr.write(f"Usage: {sys.argv[0]} [info] [warning] [error]\n")
        sys.exit(1)

    # Declare an exchange
    await channel.exchange_declare(
        exchange='logs', exchange_type='direct'
    )

    # Declaring random queue
    declare_ok = await channel.queue_declare(durable=True, auto_delete=True)

    for severity in severities:
        await channel.queue_bind(
            declare_ok.queue, 'logs', routing_key=severity
        )

    # Start listening the random queue
    await channel.basic_consume(declare_ok.queue, on_message)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
import sys
import asyncio
import aiormq


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    await channel.exchange_declare(
        exchange='logs', exchange_type='direct'
    )

    body = (
        b' '.join(arg.encode() for arg in sys.argv[2:])
        or
        b"Hello World!"
    )

    # Sending the message
    routing_key = sys.argv[1] if len(sys.argv) > 2 else 'info'

    await channel.basic_publish(
        body, exchange='logs', routing_key=routing_key,
        properties=aiormq.spec.Basic.Properties(
            delivery_mode=1
        )
    )

    print(f" [x] Sent {body!r}")

    await connection.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import sys
import asyncio
import aiormq


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    await channel.exchange_declare('topic_logs', exchange_type='topic')

    routing_key = (
        sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
    )

    body = (
        b' '.join(arg.encode() for arg in sys.argv[2:])
        or
        b"Hello World!"
    )

    # Sending the message
    await channel.basic_publish(
        body, exchange='topic_logs', routing_key=routing_key,
        properties=aiormq.spec.Basic.Properties(
            delivery_mode=1
        )
    )

    print(f" [x] Sent {body!r}")

    await connection.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import asyncio
import sys
import aiormq
import aiormq.abc


async def on_message(message: aiormq.abc.DeliveredMessage):
    print(f" [x] {message.delivery.routing_key!r}:{message.body!r}")
    await message.channel.basic_ack(
        message.delivery.delivery_tag
    )


async def main():
    # Perform connection
    connection = await aiormq.connect(
        "amqp://guest:guest@localhost/", loop=loop
    )

    # Creating a channel
    channel = await connection.channel()
    await channel.basic_qos(prefetch_count=1)

    # Declare an exchange
    await channel.exchange_declare('topic_logs', exchange_type='topic')

    # Declaring queue
    declare_ok = await channel.queue_declare('task_queue', durable=True)

    binding_keys = sys.argv[1:]

    if not binding_keys:
        sys.stderr.write(
            f"Usage: {sys.argv[0]} [binding_key]...\n"
        )
        sys.exit(1)

    for binding_key in binding_keys:
        await channel.queue_bind(
            declare_ok.queue, 'topic_logs', routing_key=binding_key
        )

    # Start listening the queue with name 'task_queue'
    await channel.basic_consume(declare_ok.queue, on_message)


loop = asyncio.get_event_loop()
loop.create_task(main())

# we enter a never-ending loop that waits for
# data and runs callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
import asyncio
import aiormq
import aiormq.abc


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)


async def on_message(message:aiormq.abc.DeliveredMessage):
    n = int(message.body.decode())

    print(f" [.] fib({n})")
    response = str(fib(n)).encode()

    await message.channel.basic_publish(
        response, routing_key=message.header.properties.reply_to,
        properties=aiormq.spec.Basic.Properties(
            correlation_id=message.header.properties.correlation_id
        ),

    )

    await message.channel.basic_ack(message.delivery.delivery_tag)
    print('Request complete')


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    # Declaring queue
    declare_ok = await channel.queue_declare('rpc_queue')

    # Start listening the queue with name 'hello'
    await channel.basic_consume(declare_ok.queue, on_message)


loop = asyncio.get_event_loop()
loop.create_task(main())

# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [x] Awaiting RPC requests")
loop.run_forever()
import asyncio
import uuid
import aiormq
import aiormq.abc


class FibonacciRpcClient:
    def __init__(self):
        self.connection = None      # type: aiormq.Connection
        self.channel = None         # type: aiormq.Channel
        self.callback_queue = ''
        self.futures = {}
        self.loop = loop

    async def connect(self):
        self.connection = await aiormq.connect("amqp://guest:guest@localhost/")

        self.channel = await self.connection.channel()
        declare_ok = await self.channel.queue_declare(
            exclusive=True, auto_delete=True
        )

        await self.channel.basic_consume(declare_ok.queue, self.on_response)

        self.callback_queue = declare_ok.queue

        return self

    async def on_response(self, message: aiormq.abc.DeliveredMessage):
        future = self.futures.pop(message.header.properties.correlation_id)
        future.set_result(message.body)

    async def call(self, n):
        correlation_id = str(uuid.uuid4())
        future = loop.create_future()

        self.futures[correlation_id] = future

        await self.channel.basic_publish(
            str(n).encode(), routing_key='rpc_queue',
            properties=aiormq.spec.Basic.Properties(
                content_type='text/plain',
                correlation_id=correlation_id,
                reply_to=self.callback_queue,
            )
        )

        return int(await future)


async def main():
    fibonacci_rpc = await FibonacciRpcClient().connect()
    print(" [x] Requesting fib(30)")
    response = await fibonacci_rpc.call(30)
    print(r" [.] Got {response!r}")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

aiormq's People

Contributors

adamhooper avatar albireox avatar caiohsramos avatar decaz avatar dgarros avatar dxist avatar gilbahat avatar harlov avatar ironhacker avatar jkr78 avatar justintarthur avatar kremius avatar malthe avatar michael-k avatar micheledkrantz avatar mikeplayle avatar morian avatar mosquito avatar norrius avatar pbelskiy avatar petr-k avatar ralbertazzi avatar rwilhelm avatar shadchin avatar shagaleevalexey avatar sobolev5 avatar tchalupnik avatar thesage21 avatar tilsche avatar wyn1995 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aiormq's Issues

No attribute named 'spec'

I am new to this library and cut-and-pasted one of the tutorial examples. https://github.com/mosquito/aiormq#id22
When attempting to run this code I get "AttributeError: module 'aiormq' has no attribute 'spec'"
Is the documentation lagging behind the code or am I missing something else? Thanks.

error while consume messages

hi there,
thanks for the efforts.
I was doing some tests using the lib and I came across the following results when consume the messages:

[x]  (b'{"created":"2020-12-11T17:37:56.131888Z","modified":"2020-12-18T22:27:29.485259Z","userId":"d23c7872-3935-4bb0-acee-02c78602c24b","userName":"yafatek","email":"[email protected]","password":"$2a$31$jShEqa.DGEhKwnpkFTXj3eSTzeDRqIBCB7wMNu7FYQFjzUo5zA6hi","accountStatus":true,"roles":[{"created":"2020-12-11T17:37:56.164768Z","modified":null,"roleId":"7f6007ac-ae2b-4d60-b9d1-5ede3655c9a7","roleName":"CLIENT","description":"User access permissions level"}],"emailVerificationToken":{"created":"2020-12-11T17:37:56.394884Z","modified":null,"id":"1b7ff4ba-50fe-49a0-a1dc-6f79d6d877a5","token":"5c2f3aab-d27d-4556-95bc-675f0020de02","tokenStatus":"STATUS_PENDING","expiryDate":"2020-12-11T18:37:56.394869Z"},"usersInfo":{"created":"2020-12-11T17:37:56.302135Z","modified":"2020-12-18T22:32:10.525474Z","userInfoId":"3a64adef-c22c-40d8-a0d3-0dee912c864f","avatarId":"c52af0c7-ce0e-4734-b56b-4bdf1808f7cb","fullName":"Feras Alawadi","phoneNumber":"00905348809120","country":"Turkey","city":"Istanbul","age":25},"emailVerified":false,"phoneVerified":true}',)
[x]  (b'{"userId": "d23c7872-3935-4bb0-acee-02c78602c24b"}',)

while :

[x]  (b'{"userId": "d23c7872-3935-4bb0-acee-02c78602c24b"}',)

is the request it self.
how to get only the response back.
plus some times am not getting any response when publish the message.
the response will come when I publish a new message and consume them.
warm regards,
Feras

DeliveredMessage might not have delivery_tag

Getting this typing error on my callback that got passed to basic_consume:

error: Item "GetEmpty" of "Union[Deliver, GetEmpty, GetOk]" has no attribute "delivery_tag"  [union-attr]
                await message.channel.basic_ack(message.delivery.delivery_tag)
                                                ^

where message is of type aiormq.types.DeliveredMessage, defined here:

aiormq/aiormq/types.py

Lines 12 to 20 in dde0ce8

DeliveredMessage = typing.NamedTuple(
"DeliveredMessage",
[
("delivery", typing.Union[Basic.Deliver, GetResultType]),
("header", ContentHeader),
("body", bytes),
("channel", "aiormq.Channel"),
],
)

Channels does not close

Here is the script which tries to declare exclusive queue:

import asyncio

import aio_pika
import aiormq


async def main():
    connection = await aio_pika.connect_robust('amqp://guest:[email protected]/')
    channel = await connection.channel()
    while True:
        try:
            await channel.declare_queue('test_lock', exclusive=True)
        except aiormq.exceptions.ChannelLockedResource:
            channel = await connection.channel()
            print('Waiting for declare')
            await asyncio.sleep(1)
            continue
        break
    print('Declared')


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()

Run:

$ python test_lock.py &
[1] 2046                  
Declared
$ python test_lock.py &
[2] 2071                            
Waiting for declare
Waiting for declare
Waiting for declare
...

The problem is when the script is being run second time and he is waiting for declare then channels he creates does not close, - according to the RabbitMQ management plugin their number within a single connection is just increasing.

Environment:

  • RabbitMQ 3.7.14
  • CPython 3.6.4
  • aio-pika 5.5.1
  • aiormq 2.3.3

Suboptimal consume performance

Recently, we ran into severe performance limitations of a consumer implemented with aio-pika. I make this issue here, because I believe the performance is mostly limited by aiormq since aio-pika 4. Publish performance in aio-pika has been discussed in mosquito/aio-pika#107. Because this is mainly focused on aiormq, I am making the issue in this repository.

So I made some simple benchmarks. While aio-pika 2.8.3 was on par with pika. The consumption rate is less than half with the current master, and still only about half of pika with pure aiormq.

Looking at the profiles, it seems a main weakness for aiormq are the large number of tasks that are created while consuming data. Particularly the @task decorated Connection.__receive_frame creates three tasks for reading the frames of each message. My rough understanding is that this is supposed to allow bulk cancellation of all things asynchronous within aiormq.

For testing I made some hacky changes - moving the @task from __receive_frame to the outer __reader_ and __rpc. I'm don't claim to have a good understanding of the impact that such a change would have outside of error-free execution.

Another minor changes was to consolidate two readexactly calls. Again I'm not sure if that would decrease resilience.

In the benchmarks, the @task change brought a large performance increase for both pure aiormq and aio-pika. There is still a gap between aio-pika and aiormq and also between pure aiormq and pika.

aiormq-performance

AMQPError __repr__ typo

This repr:

class AMQPError(Exception):
    reason = "An unspecified AMQP error has occurred: %s"

    def __repr__(self):
        return "<%s: %s>" % (self.__class__.__name__, self.reason % self.args)

doesn't work, because of that:

class ConnectionClosed(AMQPConnectionError):
    reason = "The AMQP connection was closed (%s) %s"

Connection on_close oddities

    async def _on_close(self, ex=exc.ConnectionClosed(0, "normal closed")):
        frame = (
            spec.Connection.CloseOk()
            if isinstance(ex, exc.ConnectionClosed)
            else spec.Connection.Close()
        )

        await asyncio.gather(
            self.__rpc(frame, wait_response=False), return_exceptions=True,
        )

        writer = self.writer
        self.reader = None
        self.writer = None
        self._reader_task = None  # <------ !!!

        await asyncio.gather(
            self.__close_writer(writer), return_exceptions=True,
        )

        await asyncio.gather(self._reader_task, return_exceptions=True)  # <------ self._reader_task

SSL capath vs cafile

Hi,

i've run into a problem with the SSL setup using aio-pika and specifying the CA cert in the URL string. Disregarding inconsistent documentation (aio-pika mentions ca_certs while aiormq uses cafile), none of these seem to represent what is actually being used in the code (capath for the SSL context).

If the intention of the URL parameter is to specify a CA path, I would suggest renaming it to capath or ca_path; if the parameter is intended to be used to load a single CA bundle file (as reflected in the documentation), the SSL context should be initialized with cafile instead of capath.

References: https://docs.python.org/3/library/ssl.html#ssl.SSLContext.load_verify_locations

Exception from heartbeat task does never retrieved

Hearbeats task failed silently:

[2019-08-29 13:23:09,428] [asyncio] [ERROR] Task exception was never retrieved future: <Task finished coro=<Connection.__heartbeat_task() done, defined at <...>/aiormq/aiormq/connection.py:268> exception=RuntimeError('unable to perform operation on <TCPTransport closed=True reading=False 0x564a9bc2ab58>; the handler is closed')> Traceback (most recent call last): File "<...>/aiormq/aiormq/connection.py", line 281, in __heartbeat_task self.writer.write(self._HEARTBEAT) File "<...>/.pyenv/versions/3.7.3/lib/python3.7/asyncio/streams.py", line 305, in write self._transport.write(data) File "uvloop/handles/stream.pyx", line 671, in uvloop.loop.UVStream.write File "uvloop/handles/handle.pyx", line 159, in uvloop.loop.UVHandle._ensure_alive RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False 0x564a9bc2ab58>; the handler is closed

python 3.7.3
aiormq 2.7.3
uvloop 0.12.12

Exception within LazyCoroutine

@mosquito I've already reported about this exception at commit but just in case here is the issue when using Python 3.6:

AttributeError: 'coroutine' object has no attribute '__iter__'
  File "service.py", line 444, in ack
    await message.ack()
  File "asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "aiormq/tools.py", line 50, in __await__
    return (yield from self().__iter__())

The same with synchronous call without await.

Protect RPC from cancellation

If Channel.rpc task will be cancelled and asyncio.CancelledError will be raised after request frame was written and before response frame was got from Channel.rpc_frames queue

aiormq/aiormq/channel.py

Lines 84 to 98 in 287bb06

@task
async def rpc(self, frame: spec.Frame) -> typing.Optional[spec.Frame]:
async with self.lock:
self.writer.write(
pamqp.frame.marshal(frame, self.number)
)
if frame.synchronous or getattr(frame, 'nowait', False):
result = await self.rpc_frames.get()
self.rpc_frames.task_done()
if result.name not in frame.valid_responses: # pragma: no cover
raise exc.InvalidFrameError(frame)
return result

then Channel.rpc_frames queue will be corrupted and next frames will lead to InvalidFrameError be raised.

I think asyncio.Lock is not enough, but asyncio.shield also needed to protect Channel.rpc method. Maybe it should be added not only for Channel.rpc method but also in some other places. @mosquito what do you think?

Channel number too large exception

Hey, I'm running this code:

import asyncio

import aio_pika


conn = None


async def ping_rabbit():
    channel = await conn.channel()
    await channel.close()


async def main():
    global conn
    conn = await aio_pika.connect_robust('amqp://')
    req = []
    for _ in range(65535):
        req.append(ping_rabbit())
    await asyncio.gather(*req)
    print('Done')
    await ping_rabbit()

if __name__ == '__main__':
    asyncio.run(main())

and I'm getting "Channel number too large" exception right after the "Done". Probably something related to the file https://github.com/mosquito/aiormq/blob/master/aiormq/connection.py#L429. Could you guys take a look?

Heartbeats not properly recognized

The Connection only recognizes actual Heartbeat frames when updating heartbeat_last. The RabbitMQ documentation states:

Any traffic (e.g. protocol operations, published messages, acknowledgements) counts for a valid heartbeat. Clients may choose to send heartbeat frames regardless of whether there was any other traffic on the connection but some only do it when necessary.

Therefore on an actively used connection, the RabbitMQ server does not send heartbeats and aiormq runs into the heartbeat timeout.

It also seems to be dangerous to not properly initializing the heartbeat time (currently initialized with 0).

Further, currently, heartbeats to the server are only sent as a response to heartbeats received from the server. That doesn't seem particularly reliable.

P.S. Note the difference between heartbeat interval and heartbeat timeout.

Deadlock when connection close on app teardown.

In rpc method in channel you acquire self.lock and on handle asyncio.TimeoutError you call self.close(e) wich call rpc which by that time had already captured lock. This leads to the deadlock.

import asyncio

import aio_pika


def work_1():
    async def inner():
        await asyncio.sleep(10)

    return asyncio.create_task(inner(), name='work-1')


def work_2():
    async def inner():
        await asyncio.sleep(2)
        exit(-1)

    return asyncio.create_task(inner(), name='work-2')


async def work():
    await asyncio.wait([
        work_1(), work_2()
    ])


async def main():
    connection: aio_pika.RobustConnection = await aio_pika.connect_robust(
        "amqp://guest:guest@localhost"
    )

    async with connection:
        async with connection.channel() as chanel:
            try:
                await work()
            finally:
                await asyncio.sleep(1)


from asyncio import coroutines
from asyncio import events
from asyncio import tasks


def run(main, *, debug=False):
    """Execute the coroutine and return the result.

    This function runs the passed coroutine, taking care of
    managing the asyncio event loop and finalizing asynchronous
    generators.

    This function cannot be called when another asyncio event loop is
    running in the same thread.

    If debug is True, the event loop will be run in debug mode.

    This function always creates a new event loop and closes it at the end.
    It should be used as a main entry point for asyncio programs, and should
    ideally only be called once.

    Example:

        async def main():
            await asyncio.sleep(1)
            print('hello')

        asyncio.run(main())
    """
    if events._get_running_loop() is not None:
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")

    if not coroutines.iscoroutine(main):
        raise ValueError("a coroutine was expected, got {!r}".format(main))

    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop)
        loop.set_debug(debug)
        return loop.run_until_complete(main)
    finally:
        try:
            _cancel_all_tasks(loop)
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            events.set_event_loop(None)
            loop.close()


def _cancel_all_tasks(loop):
    to_cancel = tasks.all_tasks(loop)
    if not to_cancel:
        return

    tasks1 = sorted(list(to_cancel), key=lambda t: t.get_name())

    for task in reversed(tasks1):
        task.cancel()

        try:
            print("Stop task:", task.get_name())
            res = loop.run_until_complete(task)
        except asyncio.CancelledError:
            pass
        else:
            print(res)

        print("Complete stop task:", task.get_name())

    loop.run_until_complete(
        tasks.gather(*to_cancel, loop=loop, return_exceptions=True))

    for task in to_cancel:
        if task.cancelled():
            continue
        if task.exception() is not None:
            loop.call_exception_handler({
                'message': 'unhandled exception during asyncio.run() shutdown',
                'exception': task.exception(),
                'task': task,
            })


if __name__ == '__main__':
    run(main())

Basic.Cancel sent from the broker is unactionable on user end

When the server sends a Basic.Cancel notification, aiormq simply removes the consumer tag from its consumer list.

This leaves the end-user of the lib with no way of knowing it'll not receive messages any more, although the server explicitly said so.

Shouldn't we expose this notification to the end user? That way the end user can react to this and stop listening.

asyncio.gather() on None

Here https://github.com/mosquito/aiormq/blob/master/aiormq/connection.py#L439
... is this snippet of code:

        self._reader_task = None                                            # 1

        await asyncio.gather(
            self.__close_writer(writer), return_exceptions=True
        )

        await asyncio.gather(self._reader_task, return_exceptions=True)     # 2

self._reader_task is being set to None and then given to the asyncio.gather().
This obviously raises an exception:

  File "..\venv\lib\site-packages\aiormq\connection.py", line 445, in _on_close
    await asyncio.gather(self._reader_task, return_exceptions=True)
  File "..\lib\asyncio\tasks.py", line 806, in gather
    fut = ensure_future(arg, loop=loop)
  File "..\lib\asyncio\tasks.py", line 673, in ensure_future
    raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
TypeError: An asyncio.Future, a coroutine or an awaitable is required

... which is usually conviniently and silently suppressed here: https://github.com/mosquito/aiormq/blob/master/aiormq/base.py#L139

        with suppress(Exception):
            await self._on_close(exc)

[2.7.2] Worker claims multiple messages from queue despite prefetch_count, prefetch_size = 0

Expected Behavior

Consumers are only able to claim one item from a queue at a time, when the prefetch_count and prefetch_size on the channel they consume from is zero

Actual Behavior

Consumers can claim any number of messages from the queue at a time, regardless of prefetch_count and prefetch_size

Steps to Reproduce

  1. Establish a connection, then a channel to your RabbitMQ service
  2. Assign the channel.basic_qos(prefetch_count=0, prefetch_size=0)
  3. Begin consuming from the channel

If you observe the unacked messages on the queue from the rabbitmq_management plugin, there will be an increasingly large number of unacked messages, even though there should only ever be a single message claimed from the queue at a time.

Example code demonstrating issue

#!/usr/bin/env python3

import asyncio
import json
import random
import aiormq


class Minimal:
    def __init__(self):
        self.queue_name = "minimal_test"

    async def start(self):
        self.connection = await aiormq.connect("amqp://guest:guest@localhost/")
        self.channel = await self.connection.channel()
        await self.channel.basic_qos(prefetch_count=0, prefetch_size=0)

        self.declare_ok = await self.channel.queue_declare(self.queue_name)
        await self.publish_junk()

    async def run(self):
        consume_ok = await self.channel.basic_consume(
            self.declare_ok.queue, self.on_message
        )

    async def on_message(self, message):
        await asyncio.sleep(5)
        await self.publish_junk()
        await message.channel.basic_ack(message.delivery.delivery_tag)

    async def publish_junk(self):
        for _ in range(10):
            await self.channel.basic_publish(
                json.dumps({"number": random.randint(0, 99999)}).encode("UTF-8"),
                routing_key=self.queue_name,
            )


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    minimal = Minimal()
    loop.run_until_complete(minimal.start())

    asyncio.ensure_future(minimal.run())
    loop.run_forever()

AssertionError when drain is called

I have the following dependencies:

  • CPython 3.6.7
  • aio-pika==5.4.0
  • aiormq==2.1.0

At my service I have loop exception handler (https://docs.python.org/3.6/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.set_exception_handler) therefore I see that after upgrading aio-pika (from version 4 to 5) there are many AssertionError appeared inside the asyncio:

AssertionError: null
  File "asyncio/streams.py", line 339, in drain
    yield from self._protocol._drain_helper()
  File "asyncio/streams.py", line 214, in _drain_helper
    assert waiter is None or waiter.cancelled()

I see there are several calls of asyncio.StreamWriter.drain within aiormq.Channel class (basic_ack, basic_nack and basic_reject methods) so I guess the problem relates to the aiormq. Also I guess it may be related to https://bugs.python.org/issue29930.

It's rather difficult to pick the script to reproduce the problem out of the service where it is observed and for this time I just leave it as is here so someone who experiencing the same problem could find this issue.

@mosquito FYI.

If value passed to the queue_declare arguments is int and greater then 128 then server reply that value was negative

How reproduce

Code:

import asyncio

import aiormq


async def test():
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    keys = {'x-max-length', 'x-message-ttl', 'x-max-length', 'x-max-length-bytes'}
    for k in keys:
        channel = await connection.channel()
        try:
            await channel.queue_declare('test', arguments={k: 129})
        except Exception as e:
            print(e)


asyncio.run(test())

Result:

PRECONDITION_FAILED - invalid arg 'x-max-length-bytes' for queue 'test' in vhost '/': {value_negative,-127}
PRECONDITION_FAILED - invalid arg 'x-message-ttl' for queue 'test' in vhost '/': {value_negative,-127}
PRECONDITION_FAILED - invalid arg 'x-max-length' for queue 'test' in vhost '/': {value_negative,-127}

Environment

OS: Ubuntu 19.10
Python: 3.7.5
Package version: aiormq==3.2.1
RabbitMQ: 3.8.3, Erlang 22.3

Similar bug in another library:
Polyconseil/aioamqp#204

ChannelInvalidStateError

Hi!
I have a questions. Is this a right behaviour that channel has an invalid state after attempt to consume from non existent queue?
Test case

    async def test_channel(self, rabbitmq):
        connection = await aiormq.connect(config.backends["rabbitmq"].url)
        channel = await connection.channel()
        await channel.queue_declare("default", durable=True)

        def on_message(msg):
            print(msg)

        try:
            await channel.basic_consume("non_existent_queue", on_message)
        except Exception:
            LOG.exception("#1")

        try:
            await channel.basic_consume("default", on_message)
        except Exception:
            LOG.exception("#2")

output

2020-11-07 08:03:55,679 |ERROR| #1
Traceback (most recent call last):
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/vagrant/dev/.../tests/tests_medium.py", line 142, in test_channel
    await channel.basic_consume("non_existent_queue", on_message)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/channel.py", line 422, in basic_consume
    return await self.rpc(
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py", line 692, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
aiormq.exceptions.ChannelNotFoundEntity: NOT_FOUND - no queue 'non_existent_queue' in vhost '/'
2020-11-07 08:03:55,686 |ERROR| #2
Traceback (most recent call last):
  File "/vagrant/dev/.../tests/tests_medium.py", line 147, in test_channel
    await channel.basic_consume("default", on_message)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/channel.py", line 422, in basic_consume
    return await self.rpc(
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/channel.py", line 121, in rpc
    raise ChannelInvalidStateError("writer is None")
aiormq.exceptions.ChannelInvalidStateError: writer is None

The second case. Connection lost. Behaviour(exception type) depends on timeout.

    async def test_lost_connection(self, rabbitmq):
        connection = await aiormq.connect(config.backends["rabbitmq"].url)
        channel = await connection.channel()
        await channel.queue_declare("default", durable=True)

        def on_message(msg):
            print(msg)

        await channel.basic_consume("default", on_message)

        rabbitmq.stop()

        # await asyncio.sleep(5)

        try:
            await channel.basic_consume("other", on_message)
        except Exception:
            LOG.exception("#1")

Output

Task was destroyed but it is pending!
task: <Task pending name='Task-15' coro=<FutureStore.reject_all() done, defined at /home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py:65> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7fc613b96280>()]>>
Task exception was never retrieved
future: <Task finished name='Task-17' coro=<_wrap_awaitable() done, defined at /home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py:685> exception=ConnectionResetError(104, 'Connection reset by peer')>
Traceback (most recent call last):
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py", line 692, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/vagrant/dev/.../tests/tests_medium.py", line 148, in test_lost_connection
    await channel.basic_consume("other", on_message)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/channel.py", line 422, in basic_consume
    return await self.rpc(
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py", line 692, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py", line 692, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/connection.py", line 423, in __close_writer
    await writer.wait_closed()
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/streams.py", line 359, in wait_closed
    await self._protocol._get_close_waiter(self)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/connection.py", line 375, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/connection.py", line 327, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/streams.py", line 723, in readexactly
    await self._wait_for_data('readexactly')
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data
    await self._waiter
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/selector_events.py", line 910, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
Task exception was never retrieved
future: <Task finished name='Task-19' coro=<_wrap_awaitable() done, defined at /home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py:685> exception=ConnectionResetError(104, 'Connection reset by peer')>
Traceback (most recent call last):
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py", line 692, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/vagrant/dev/.../tests/tests_medium.py", line 148, in test_lost_connection
    await channel.basic_consume("other", on_message)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/channel.py", line 422, in basic_consume
    return await self.rpc(
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py", line 692, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py", line 692, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/connection.py", line 423, in __close_writer
    await writer.wait_closed()
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/streams.py", line 359, in wait_closed
    await self._protocol._get_close_waiter(self)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/connection.py", line 375, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/connection.py", line 327, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/streams.py", line 723, in readexactly
    await self._wait_for_data('readexactly')
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data
    await self._waiter
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/selector_events.py", line 910, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
Task exception was never retrieved
future: <Task finished name='Task-20' coro=<_wrap_awaitable() done, defined at /home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py:685> exception=ConnectionResetError(104, 'Connection reset by peer')>
Traceback (most recent call last):
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py", line 692, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/vagrant/dev/.../tests/tests_medium.py", line 148, in test_lost_connection
    await channel.basic_consume("other", on_message)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/channel.py", line 422, in basic_consume
    return await self.rpc(
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py", line 692, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py", line 692, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/connection.py", line 423, in __close_writer
    await writer.wait_closed()
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/streams.py", line 359, in wait_closed
    await self._protocol._get_close_waiter(self)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/connection.py", line 375, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/connection.py", line 327, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/streams.py", line 723, in readexactly
    await self._wait_for_data('readexactly')
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data
    await self._waiter
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/selector_events.py", line 910, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<Connection.__reader() done, defined at /home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py:165> exception=ConnectionResetError(104, 'Connection reset by peer')>
Traceback (most recent call last):
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py", line 692, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/vagrant/dev/.../tests/tests_medium.py", line 148, in test_lost_connection
    await channel.basic_consume("other", on_message)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/channel.py", line 422, in basic_consume
    return await self.rpc(
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py", line 692, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py", line 692, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/connection.py", line 423, in __close_writer
    await writer.wait_closed()
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/streams.py", line 359, in wait_closed
    await self._protocol._get_close_waiter(self)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/connection.py", line 375, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/connection.py", line 327, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/streams.py", line 723, in readexactly
    await self._wait_for_data('readexactly')
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data
    await self._waiter
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/asyncio/selector_events.py", line 910, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer

after uncomment await asyncio.sleep(5) get

2020-11-07 08:23:51,085 |ERROR| #1
Traceback (most recent call last):
  File "/vagrant/dev/.../tests/tests_medium.py", line 148, in test_lost_connection
    await channel.basic_consume("other", on_message)
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/channel.py", line 422, in basic_consume
    return await self.rpc(
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
  File "/home/vagrant/.pyenv/versions/3.8.6/lib/python3.8/site-packages/aiormq/channel.py", line 121, in rpc
    raise ChannelInvalidStateError("writer is None")
aiormq.exceptions.ChannelInvalidStateError: writer is None

aiormq 3.3.1
Python 3.8.6

Thank you for help!

Trying to publish on a closed channel results in broad exception type `RuntimeError`

Hi,

we are working on an application that, through aio-pika, uses aiormq. While dealing with some issues caused by unreliable network conditions, we noticed that trying to publish on a channel that has been closed results in a rather broad RuntimeError, raised here:

raise RuntimeError('%r closed' % self)

Below is a typical stack trace for when this happens, starting at a call to aio_pika.Exchange.publish():

Traceback (most recent call last):
  <snip>
  ...
  </snip>
  File "/<our_project>/source.py", line 106, in _send
    await self.data_exchange.publish(msg, routing_key=metric, mandatory=False)
  File "/<venv>/lib/python3.7/site-packages/aio_pika/exchange.py", line 200, in publish
    ), timeout=timeout
  File "/usr/lib64/python3.7/asyncio/tasks.py", line 414, in wait_for
    return await fut
  File "/<venv>/lib/python3.7/site-packages/aiormq/channel.py", line 426, in basic_publish
    async with self.lock:
  File "/<venv>/lib/python3.7/site-packages/aiormq/channel.py", line 77, in lock
    raise RuntimeError('%r closed' % self)
RuntimeError: <Channel: "1"> closed

We are afraid that, when writing error handling code for this condition, catching RuntimeError might accidentally catch other errors, too.

Would it be possible to raise a more descriptive exception, of distinct type? The exception type might subclass RuntimeError as to not break code that catches that.

AttributeError: 'NoneType' object has no attribute 'drain'

Sometimes I get this error. The traceback is:

AttributeError: 'NoneType' object has no attribute 'drain'
  File "asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "aiormq/tools.py", line 67, in __await__
    return (yield from self().__await__())
  File "aiormq/connection.py", line 130, in drain
    return await self.writer.drain()

@mosquito I think it is because here self.writer can become None while waiting for lock:

aiormq/aiormq/connection.py

Lines 125 to 130 in 26ffef5

async def drain(self):
if not self.writer:
raise RuntimeError("Writer is %r" % self.writer)
async with self.__drain_lock:
return await self.writer.drain()

hidden error in Connection._on_close

I tried to subclass Connection to extend network failures processing.

class SomeClass(aiormq.Connection):
    async def _on_close(self, exc=ConnectionClosed(0, 'normal closed')):
        await super()._on_close(exc)
        # rest of the code

The "rest of the code" does not execute. After temporary removing of the line https://github.com/mosquito/aiormq/blob/master/aiormq/base.py#L142
I figured out that aiormq.Connection._on_close always fails in the line https://github.com/mosquito/aiormq/blob/master/aiormq/connection.py#L399
due to reason AttributeError: 'StreamWriter' object has no attribute 'wait_closed'.

Python 3.6.8
Linux
aiormq 2.6.0

4.x.x still Unstable?

latest version is 5.2.2, and Latest release is 4.0.0 and from a year ago, so readme needs change?

Consumer callback should be removed from Channel.consumers on basic_cancel

When basic_consume is called, callback is saved into Channel.consumers:

self.consumers[consumer_tag] = consumer_callback

But it's never removed when basic_cancel is called or anywhere else.
This part looks like it's intended to do it, but it never actually enters this path:

aiormq/aiormq/channel.py

Lines 251 to 253 in 4fb5510

elif isinstance(frame, spec.Basic.Cancel):
self.consumers.pop(frame.consumer_tag, None)
continue

Adding this piece of code fixes the problem:

elif isinstance(frame, spec.Basic.CancelOk):
    self.consumers.pop(frame.consumer_tag, None)

A code example:

# aiormq==2.6.0
import asyncio
import aiormq

async def on_message(message):
    await asyncio.sleep(5)

async def main():
    connection = await aiormq.connect("amqp://guest:guest@localhost/")
    channel = await connection.channel()
    deaclare_ok = await channel.queue_declare('helo')

    while True:
        consume_ok = await channel.basic_consume(
            deaclare_ok.queue, on_message, no_ack=True
        )
        cancel_ok = await channel.basic_cancel(consume_ok.consumer_tag)
        print(len(channel.consumers), consume_ok.consumer_tag, cancel_ok.consumer_tag)
        await asyncio.sleep(1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()

Example output (note than number grows over time):

1 ctag1.7bb39cff6fdb40e0427ab57158f2c043 ctag1.7bb39cff6fdb40e0427ab57158f2c043
2 ctag1.a2168d75988391d20d686d08982337a3 ctag1.a2168d75988391d20d686d08982337a3
3 ctag1.f5213cab45868045e9c16ccfdbe369ec ctag1.f5213cab45868045e9c16ccfdbe369ec
4 ctag1.33c584a805fb98fdcbb61e80f6c92314 ctag1.33c584a805fb98fdcbb61e80f6c92314
5 ctag1.5957e99d5daa0985002756b7fed70c0d ctag1.5957e99d5daa0985002756b7fed70c0d
6 ctag1.765566530efe09143c2331b71c3166e8 ctag1.765566530efe09143c2331b71c3166e8
7 ctag1.8312f1f6e90988330d81842ead1d03ec ctag1.8312f1f6e90988330d81842ead1d03ec
8 ctag1.2d66f81b58fc986f1decf711cb1b27f6 ctag1.2d66f81b58fc986f1decf711cb1b27f6

Error handling problem.

Hello.
Accidentally encountered a hang in the basic_publish() method.
A small example demonstrates this.

import asyncio
import aiormq
import logging


async def test():
    amqp_url = "amqp://user:[email protected]/vhost"
    connection = await aiormq.connect(amqp_url)
    channel = await connection.channel()

    # Passing the wrong parameter type to basic_publish() body parameter str instead of bytes.
    # Execution hangs on call basic_publish().
    # No exception was thrown.
    # There is no error in the log.
    await channel.basic_publish("123")

    await connection.close()


try:
    logging.basicConfig(level=logging.DEBUG)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(test())
    loop.close()
except Exception as e:
    print("Exception:", e)

Can't seem to capture the ConnectionClosed exception

I'm using the simple consumer example and I get messages from local message broker.

Then I artificially restart the broker.

The frame arrives, gets decoded and the ConnectionClosed exception is created.
However, the code tries to close the connection (and send a CloseOk frame, which won't succeed), but the exceptions are suppressed. The ConnectionClosed itself is passed on, but I don't see where it is raised again.

I'm only getting the following output and no error raised.

Unexpected connection close from remote "amqp://username:******@localhost:5672/%2F", Connection.Close(reply_code=320, reply_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
NoneType: None

Note that this is just a print statement, not the exception itself. (I find the NoneType: None suspiciously buggy). After that, no more messages are consumed.

I'd like to capture the ConnectionClosed (and potientially other related ones) and retry the connection (with an exponential backoff) when the broker is restarted.

Am I miss-using the package or am I missing something in the asyncio-loop catching-exception business?
I've tried to put some encompassing try/except but I can't seem to catch the ConnectionClosed exception (although I see it in the logs, when I turn on the debug output. Where does it get swallowed, and why?)

Unknown <pamqp.specification.Basic.Ack object at 0x1043e5a58> from broker

When I publish messages to an exchange for which there is no route, I get the following message for each publish:

Unknown <pamqp.specification.Basic.Ack object at 0x1043e5a58> from broker

What I found out from my investigation is that by default the mandatory flag is set to True, which tells the RabbitMQ server to answer with a Return frame and an Ack frame. However, the Channel clears out the Promise in the self.confirmations already in the _on_return method. Thus, the Ack will complain by printing the above message.

On a side note: I find the default settings questionable, as mandatory is set to true, but the on_return_raises flag is false. I'd either want to silently drop the message if it's not routable, or that an exception is raised.

Missing method `__aexit__` on class `Connection`

Hello and first thank you for this project and for aio-pika!

I wanted to use Connection as an async context but it turns out Connection and AbstractConnection from aiormq do not implement __aexit__ but they do have a counterpart __aenter__ method.

I managed to work around this by inheriting my own class in the following way:

class Connection(aiormq.Connection):
    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> bool:
        await self.close(exc_val)
        return False

Has this method been lost or forgotten in the battle or is there some reason this was not implemented?

Thanks!

Add ExternalAuth Support

I would like to add support for External Authentication to enable SSL client certificate authentication. Looking at pika, it seems an empty binary string is all that's required at the protocol level. I'm proposing minimal changes that involve a new authentication class and an explicit external authentication option via query string. These modifications work well in my use case and all existing tests pass. I'm still working on expanding the tests to check for proper client certificate validation.

Wanting to keep the stable version, I forked and edited from tag 3.3.1. Would it be possible to merge these back in and tag a new version?

Using channel after CancelledError exception in basic_consume callback can cause connection to close

Python 3.8.1

aio-pika 6.4.1
aiormq 3.2.0

Consider callback example (using aio_pika):

async def callback(message: IncomingMessage):
    async with message.process():
        await asyncio.sleep(0.1)

Or plain aiormq:

async def callback(message: DeliveredMessage):
    try:
        await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        await message.channel.basic_nack(message.delivery.delivery_tag)
    else:
        await message.channel.basic_ack(message.delivery.delivery_tag)

and this section:

aiormq/aiormq/base.py

Lines 139 to 143 in 63a8b0d

with suppress(Exception):
await self._on_close(exc)
with suppress(Exception):
await self._cancel_tasks(exc)

Those two examples give me:

"CHANNEL_ERROR - expected 'channel.open'"

in Connection.add_close_callback (Connection.closing.add_done_callback for aiormq) after await channel.close()

I understand that second example is far-fetched. It's not exactly necessary to send nack/reject in case of CancelledError. But I ran into something similar to first example and had been debugging for a long time what causes sending nacks after channel has been closed. I thought that I simply can't send anything after close, so I needed to wrap code in callback in some logic to prevent sending messages after close. And than I found that channel.close waits for all subtasks(including on_message callbacks?) but for some reason after it have already sent Channel.Close method via AMQP
So my question is following: Is closing channel first and than cancelling subtasks intentional?

One full example:

import asyncio
import logging
from typing import Optional

import aio_pika
from aio_pika import IncomingMessage

import sys


logging.basicConfig(format='%(relativeCreated)8.2f - %(name)20s - %(levelname)8s - %(message)s', level=logging.DEBUG, stream=sys.stdout)
queue_name = 'test_queue'


async def callback(message: IncomingMessage):
    try:
        async with message.process():
            await asyncio.sleep(0.1)
    finally:
        print('--- Callback finished')


def close_callback(reason):
    logging.warning('CONNECTION CLOSED %s', str(reason))


async def consumer():
    connection: Optional[aio_pika.Connection] = None
    channel: Optional[aio_pika.channel.Channel] = None
    tag: Optional[str] = None
    q: Optional[aio_pika.Queue] = None

    try:
        connection = await aio_pika.connect('amqp://guest:guest@localhost')
        connection.add_close_callback(close_callback)

        channel = await connection.channel()
        q = await channel.declare_queue(queue_name, durable=True)
        await channel.set_qos(prefetch_count=3)
        tag = await q.consume(callback)

        await asyncio.sleep(1.2)
        logging.info('Starting to close')
    finally:
        if q and tag:
            await q.cancel(tag)
            logging.info('Queue consume canceled')

        if channel:
            await channel.close()
            logging.info('Channel closed')

        await asyncio.sleep(2)
        logging.info('After 2 seconds: Connection.is_closed == %s ', connection.is_closed)

        if connection:
            logging.info('Before connection.close()')
            await connection.close()
            logging.info('After connection.close()')


if __name__ == '__main__':
    asyncio.run(consumer())

Output:

   51.30 -              asyncio -    DEBUG - Using selector: EpollSelector
   56.96 -  aio_pika.connection -    DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
   57.17 -  aio_pika.connection -    DEBUG - Channel created: <Channel "None#Not initialized channel">
   58.63 -       aio_pika.queue -    DEBUG - Declaring queue: <Queue(test_queue): auto_delete=False, durable=True, exclusive=False, arguments=None>
  194.32 -       aio_pika.queue -    DEBUG - Start to consuming queue: <Queue(test_queue): auto_delete=False, durable=True, exclusive=False, arguments=None>
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
 1396.74 -                 root -     INFO - Starting to close
 1398.72 -                 root -     INFO - Queue consume canceled
--- __CLOSER <Channel: "1">
--- _on_close ISSUED <Channel: "1">
--- _on_close FINISHED <Channel: "1">
--- _cancel_tasks ISSUED <Channel: "1">
--- Callback finished
--- Callback finished
--- Callback finished
--- _cancel_tasks FINISHED <Channel: "1">
 1402.86 -                 root -     INFO - Channel closed
--- __CLOSER <Connection: "amqp://guest:******@localhost">
--- _on_close ISSUED <Connection: "amqp://guest:******@localhost">
--- _cancel_tasks ISSUED <Connection: "amqp://guest:******@localhost">
 1406.87 -                 root -  WARNING - CONNECTION CLOSED CHANNEL_ERROR - expected 'channel.open'
 1407.04 -  aio_pika.connection -    DEBUG - Closing AMQP connection None
 1407.40 -    aiormq.connection -    DEBUG - Reader task cancelled:
Traceback (most recent call last):
  File "/home/lamar/.envs/api_server3/lib/python3.8/site-packages/aiormq/connection.py", line 385, in __reader
    return await self.close(
  File "/home/lamar/.envs/api_server3/lib/python3.8/site-packages/aiormq/base.py", line 154, in close
    await self.loop.create_task(self.__closer(exc))
asyncio.exceptions.CancelledError
 3404.40 -                 root -     INFO - After 2 seconds: Connection.is_closed == False 
 3404.56 -                 root -     INFO - Before connection.close()
 3404.63 -                 root -     INFO - After connection.close()

P.S. I added custom prints to __closer in aiormq:

    async def __closer(self, exc):
        print("--- __CLOSER", repr(self))
        if self.is_closed:  # pragma: no cover
            return

        with suppress(Exception):
            print('--- _on_close ISSUED', repr(self))
            await self._on_close(exc)
            print('--- _on_close FINISHED', repr(self))

        with suppress(Exception):
            print('--- _cancel_tasks ISSUED', repr(self))
            await self._cancel_tasks(exc)
            print('--- _cancel_tasks FINISHED', repr(self))

P.S Connection.is_closed equals False in log 2 seconds after closing callback fired

Incomplete read error when closing connection

I'm getting the following traceback when stopping my connection:

Traceback (most recent call last):
  File "/nix/store/p8j6zgmwlr25cf4viyyqfalfnd9dh9y1-python3-3.7.2-env/lib/python3.7/site-packages/aiormq/connection.py", line 334, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/nix/store/p8j6zgmwlr25cf4viyyqfalfnd9dh9y1-python3-3.7.2-env/lib/python3.7/site-packages/aiormq/base.py", line 171, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/nix/store/p8j6zgmwlr25cf4viyyqfalfnd9dh9y1-python3-3.7.2-env/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
  File "/nix/store/p8j6zgmwlr25cf4viyyqfalfnd9dh9y1-python3-3.7.2-env/lib/python3.7/site-packages/aiormq/connection.py", line 285, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/nix/store/p8j6zgmwlr25cf4viyyqfalfnd9dh9y1-python3-3.7.2-env/lib/python3.7/asyncio/streams.py", line 677, in readexactly
    raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 1 expected bytes

I'm pretty much calling channel.close() and then connection.close() before stopping and closing the loop. Shouldn't it close without an error?

Connection close

How to handle next error message:

DEBUG:aiormq.connection:Can not read bytes from server:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/aiormq/connection.py", line 381, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/usr/local/lib/python3.7/dist-packages/aiormq/connection.py", line 333, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/lib/python3.7/asyncio/streams.py", line 677, in readexactly
    raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 1 expected bytes

[2.5.5] Repeated KeyError when manually acking

In channels.Channel._confirm_delivery():

        confirmation = self.confirmations.pop(delivery_tag)

        if confirmation is self.Returning:
            return
        elif confirmation.done():  # pragma: nocover
            log.warn(
                "Delivery tag %r confirmed %r was ignored",
                delivery_tag, frame
            )
            return
        elif isinstance(frame, spec.Basic.Ack):
            confirmation.set_result(frame)
            return

        confirmation.set_exception(
            exc.DeliveryError(None, frame)
        )  # pragma: nocover

I get the following errors on each attempt to ack the message (doing so manually):


[2019-06-21 13:00:31,912] [        base_events.py:1268] [   ERROR]   Exception in callback Channel._confirm_delivery(7542, <pamqp.specif...0x2480e5a49b0>)
handle: <Handle Channel._confirm_delivery(7542, <pamqp.specif...0x2480e5a49b0>)>
Traceback (most recent call last):
  File "C:\Program Files\Python36\lib\asyncio\events.py", line 145, in _run
    self._callback(*self._args)
  File "C:\Users\gregdan3\AppData\Roaming\Python\Python36\site-packages\aiormq\channel.py", line 200, in _confirm_delivery
    confirmation = self.confirmations.pop(delivery_tag)
KeyError: 7542

[2019-06-21 13:00:31,912] [        base_events.py:1268] [   ERROR]   Exception in callback Channel._confirm_delivery(7543, <pamqp.specif...0x2480e5a49b0>)
handle: <Handle Channel._confirm_delivery(7543, <pamqp.specif...0x2480e5a49b0>)>
Traceback (most recent call last):
  File "C:\Program Files\Python36\lib\asyncio\events.py", line 145, in _run
    self._callback(*self._args)
  File "C:\Users\gregdan3\AppData\Roaming\Python\Python36\site-packages\aiormq\channel.py", line 200, in _confirm_delivery
    confirmation = self.confirmations.pop(delivery_tag)
KeyError: 7543

This does not occur in 2.5.3, which is my band-aid solution for the time being.

This is also being run on python 3.6.6, in Windows, if this is relevant. I have demonstrated the problem occurring on other systems (Linux running 3.7.3), as well.

DeliveredMessage.reply_to doesn't exist

In an RPC framework, the server cannot access the reply_to variable of the message. It doesn't seem to exist, or I can't find it.

Server code:

# Setup/connect
await self.chan.queue_declare(self.requests_queue)
await self.chan.basic_consume(self.requests_queue, self._process_request)

async def _process_request(self, request):
    # Do stuff

    # Respond
    await request.channel.basic_publish(
        body=response_bin, 
        routing_key=request.reply_to,
        properties=aiormq.spec.Basic.Properties(
            correlation_id=request.correlation_id
        )
    )

Error:

AttributeError: 'DeliveredMessage' object has no attribute 'reply_to'

Repeated TypeError in exceptions.py while attempting to cancel event loop tasks

Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/base_events.py", line 1634, in call_exception_handler
    self.default_exception_handler(context)
  File "/usr/lib/python3.7/asyncio/base_events.py", line 1605, in default_exception_handler
    value = repr(value)
  File "/usr/lib/python3.7/asyncio/base_tasks.py", line 9, in _task_repr_info
    info = base_futures._future_repr_info(task)
  File "/usr/lib/python3.7/asyncio/base_futures.py", line 60, in _future_repr_info
    info.append(f'exception={future._exception!r}')
  File "/home/gregdan3/.local/lib/python3.7/site-packages/aiormq/exceptions.py", line 6, in __repr__
    return "<%s: %s>" % (self.__class__.__name__, self.message % self.args)
TypeError: not all arguments converted during string formatting

If I attempt to cancel the publish tasks after they have begun, this occurs inconsistently.
I have yet to find a specific cause of the issue.
This occurs in aio_pika 5.5.3 and aiormq 2.5.3

[2019-06-03 16:57:04,123] [         connection.py:380 ] [   DEBUG]   Reader task cancelled:
Traceback (most recent call last):
  File "./scrape.py", line 312, in main
    loop.run_forever()
  File "/usr/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
    self._run_once()
  File "/usr/lib/python3.7/asyncio/base_events.py", line 1775, in _run_once
    handle._run()
  File "/usr/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/home/gregdan3/.local/lib/python3.7/site-packages/aiormq/channel.py", line 245, in _reader
    await self._on_deliver(frame)
  File "/home/gregdan3/.local/lib/python3.7/site-packages/aiormq/channel.py", line 146, in _on_deliver
    header = await self._get_frame()    # type: ContentHeader
  File "/home/gregdan3/.local/lib/python3.7/site-packages/aiormq/channel.py", line 79, in _get_frame
    self.frames.task_done()
  File "/usr/lib/python3.7/asyncio/queues.py", line 205, in task_done
    self._finished.set()
  File "/usr/lib/python3.7/asyncio/locks.py", line 270, in set
    for fut in self._waiters:
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/gregdan3/.local/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/gregdan3/.local/lib/python3.7/site-packages/aiormq/connection.py", line 349, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/gregdan3/.local/lib/python3.7/site-packages/aiormq/base.py", line 171, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/gregdan3/.local/lib/python3.7/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
concurrent.futures._base.CancelledError

This is the process that normally occurs, but again, this does not always occur as intended.

asyncio.ensure_future(publisher.run(), loop=loop)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        _log.info("Shutting down Asyncio loop")
        all_tasks = asyncio.Task.all_tasks(loop=loop)
        for task in all_tasks:
            task.cancel()
        kill_task = asyncio.ensure_future(publisher.stop(), loop=loop)
        loop.run_until_complete(kill_task)
    finally:
        loop.close()

This is the specific code where I kill the tasks on the event loop.

Acknowledgements for multiple discontinuous tags

We ran into an issue of unconfirmed publishes when dispatching multiple publishes simultaneously. I can reproduce the issue with the following example with rabbitmq:

import asyncio
import aiormq
from datetime import datetime


async def main():
    connection = await aiormq.connect("amqp://guest:guest@localhost/")
    channel = await connection.channel()
    declare_ok = await channel.queue_declare(exclusive=True)
    await channel.queue_bind(declare_ok.queue, 'amq.topic', routing_key='test.3')

    while True:
        print("send ", datetime.now())
        messages = [channel.basic_publish(b'test', exchange='amq.topic', routing_key='test.{}'.format(i))
                    for i in range(6)]
        await asyncio.wait(messages)
        print("done ", datetime.now())
        await asyncio.sleep(2)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

It appears that rabbitmq delays the ack for messages that are published to a certain queue compared to messages that not rounted, which results in a bit messy ack order. Looking at wireshark one of the ack-sequences is the following

Basic.Ack(7, False)
Basic.Ack(8, False)
Basic.Ack(9, False)
Basic.Ack(11, False)
Basic.Ack(12, True)

The current code does not confirm message 10. I'm pretty sure it should do so. I'm drafting an an example fix PR.

I do no think this is related to #9.

Connection._on_close() not being called if Connection.closing is externally cancelled

  • Create a consumer connection
  • Create a task T1 with await connection.closing
  • When T1 gets externally cancelled (not by close() method, but e.g. when Ctrl+C is pressed during asyncio.run), the _on_close() method is never called.

I can call this method manually, like this:

async def _worker():
    try:
        await connection.closing
    except asyncio.CancelledError:
        connection._on_close()

But I don't think calling private methods should be "the best solution".

I thought I could get around this by calling await connection.close() (which results in calling _on_close()) in the CancelledError handler, however, there's a check for is_closed right at the top (and this check is also present in the __closer() callback.

    async def close(self, exc=asyncio.CancelledError()):
        if self.is_closed:
            return

... because is_closed returns true, the _on_close() will never get called.

My proposed solution would be to add_done_callback on the closing future, that calls _on_close, but this would also require to do a better check in the callbacks methods (instead of just is_closed).

I hope my explanation wasn't too much confusing, hit me back if you'd like me to clarify something.

Queue declare fails for specific values of `x-expires`

Consider the following simple script:

#!/usr/bin/env python
import asyncio
import aiormq

X_EXPIRES = 32767

async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost:5672/")

    # Creating a channel
    channel = await connection.channel()
    queue = await channel.queue_declare('test2', arguments={'x-expires': X_EXPIRES})

    # Sending the message
    await channel.basic_publish(b'Hello World!', routing_key='hello')
    print(" [x] Sent 'Hello World!'")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

With the value X_EXPIRES = 32767 this works just fine, however, when upping it by one to X_EXPIRES = 32768 the code will except with the following stacktrace:

Traceback (most recent call last):
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "./test_rmq.py", line 20, in <module>
    loop.run_until_complete(main())
  File "/usr/lib/python3.7/asyncio/base_events.py", line 587, in run_until_complete
    return future.result()
  File "./test_rmq.py", line 12, in main
    queue = await channel.queue_declare('test2', arguments={'x-expires': 32768})
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/channel.py", line 713, in queue_declare
    timeout=timeout,
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/connection.py", line 423, in __close_writer
    await writer.wait_closed()
  File "/usr/lib/python3.7/asyncio/streams.py", line 323, in wait_closed
    await self._protocol._closed
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/connection.py", line 375, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/connection.py", line 327, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/lib/python3.7/asyncio/streams.py", line 679, in readexactly
    await self._wait_for_data('readexactly')
  File "/usr/lib/python3.7/asyncio/streams.py", line 473, in _wait_for_data
    await self._waiter
  File "/usr/lib/python3.7/asyncio/selector_events.py", line 814, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer

It seems like it may have something to do with the incorrect interpretation or casting of the integer value, because the magical limit corresponds to 2^15 == 32768 so maybe it is defined as signed two byte int. The same behavior happens for example at 2^16 == 65536 where X_EXPIRES = 66535 fails but X_EXPIRES = 66536 works again. So it seems the range [32768 - 65535] i.e. 2^15 <= X_EXPIRES < 2^16 will cause the exception.

Note that this only seems to happen with RabbitMQ 3.5. I cannot reproduce this with RabbitMQ 3.6.
Have you got an idea why this might be happening, and would it be possible to create a patch for this?

how to set a timeout param when connection is creating

hi,
I have a question how to set a timeout param when connection is creating ,otherwise it takes about two heartbeat_interval(130s) to report ConnectionError: [Errno 110].I read aiormq's connection.py source code but still cannot find a way.

2019-11-08 15:12:27,242 consumer INFO /root/workspace/rabbitmq_connector.py start connect_to_rabbitmq
INFO:consumer:start connect to rabbitmq
Traceback (most recent call last):
  File "/root/workspace/python3.6/site-packages/aiormq/connection.py", line 227, in connect
    loop=self.loop
  File "/usr/lib/python3.6/asyncio/streams.py", line 81, in open_connection
    lambda: protocol, host, port, **kwds)
  File "/usr/lib/python3.6/asyncio/base_events.py", line 794, in create_connection
    raise exceptions[0]
  File "/usr/lib/python3.6/asyncio/base_events.py", line 781, in create_connection
    yield from self.sock_connect(sock, address)
  File "/usr/lib/python3.6/asyncio/selector_events.py", line 439, in sock_connect
    return (yield from fut)
  File "/usr/lib/python3.6/asyncio/selector_events.py", line 469, in _sock_connect_cb
    raise OSError(err, 'Connect call failed %s' % (address,))
TimeoutError: [Errno 110] Connect call failed ('192.168.20.251', 5672)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/root/workspace/python3.6/site-packagesrabbitmq_connector.py", line 140, in get_connection
    self.connection = await aiormq.connect(url)
  File "/root/workspace/python3.6/site-packages/aiormq/connection.py", line 521, in connect
    await connection.connect(client_properties or {})
  File "/root/workspace/python3.6/site-packages/aiormq/base.py", line 171, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/root/workspace/python3.6/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
  File "/root/workspace/python3.6/site-packages/aiormq/connection.py", line 230, in connect
    raise ConnectionError(*e.args) from e
ConnectionError: [Errno 110] Connect call failed ('192.168.20.251', 5672)
2019-11-08 15:14:37,132 5f18209e-fac5-11e9-a5f4-000c293c5e0b consumer INFO /root/workspace/rabbitmq_connector.py get_connection 149 : connect rabbitmq failed:[Errno 110] Connect call failed ('192.168.20.251', 5672)
INFO:consumer:connect rabbitmq failed:[Errno 110] Connect call failed ('192.168.20.251', 5672)

aiorun closing issue

Hello, after bumping version of aiormq on our application we run into problem with shutting down.
An aiorun has a graceful closing of pending tasks and it is waiting all are not closed.
When some exception occurs we expect that application closes.
Now sometimes we got a zombie program, because aiorun is waiting to close a heartbeat task.

Invalid logging on DeliveryError

If aiormq.exceptions.DeliveryError is raised during publish, correct logging of the error fails because its message field is an instance of DeliveredMessage.

Traceback (most recent call last):
  File "/usr/share/python3/app/lib/python3.7/site-packages/raven/utils/serializer/manager.py", line 76, in transform
    return repr(value)
  File "/usr/lib/python3.7/asyncio/base_futures.py", line 60, in _future_repr_info
    info.append(f'exception={future._exception!r}')
  File "/usr/share/python3/app/lib/python3.7/site-packages/aiormq/exceptions.py", line 6, in __repr__
    return "<%s: %s>" % (self.__class__.__name__, self.message % self.args)
TypeError: unsupported operand type(s) for %: 'DeliveredMessage' and 'tuple'

yarl 1.5.0 causes ConnectionError

Since yarl 1.5.0 (using yarl-1.5.0-cp37-cp37m-manylinux1_x86_64.whl) the code snippet

loop = asyncio.get_running_loop()
connection = await aio_pika.connect_robust(
    "amqp://guest:[email protected]/", loop=loop
)

will cause a ConnectionError raised from connection.py:231

@task
async def connect(self, client_properties: dict = None):
    if self.writer is not None:
        raise RuntimeError("Already connected")
    
    ssl_context = None
    
    if self.url.scheme == "amqps":
        ssl_context = await self.loop.run_in_executor(
            None, self._get_ssl_context,
        )
    
    try:
        self.reader, self.writer = await asyncio.open_connection(
            self.url.host, self.url.port, ssl=ssl_context,
        )
    except OSError as e:
>     raise ConnectionError(*e.args) from e
E     ConnectionError: [Errno 111] Connect call failed ('127.0.0.1', 0)

I suggest to pin the version to yarl == 1.4.2. I haven't had the time to investigate the root-cause for this yet.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.