Code Monkey home page Code Monkey logo

aredis's People

Contributors

amatai avatar barbieri avatar bjowi avatar bsergean avatar coykto avatar eoghanmurray avatar fogapod avatar inytar avatar jayh5 avatar kingnand avatar marsoft avatar mek-yt avatar mjwestcott avatar mumbleskates avatar nonegg avatar pliu avatar rubyw avatar stj avatar thekevjames avatar thiefmaster avatar whiteplanet avatar xchange avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aredis's Issues

Connection's recv buffer should be emptied before sending new command

In some case, if a command was interrupted by outer exceptions(such as timeout), the connection will be released to the pool containing unconsumed data. The unconsumed data will be read by the next command and thus lead to mistake.

The following code can reproduce this situation (it use async_timeout library from aio-libs):

# coding:utf-8

from aredis import StrictRedisCluster
import asyncio
import random
import async_timeout
import time

redis_cluster_client = StrictRedisCluster(
    startup_nodes=[{"host": "127.0.0.1", "port": 7001}],
    max_connections=40, max_connections_per_node=True)


async def worker():
    while(1):
        try:
            with async_timeout.timeout(0.05):
                for _ in range(2):
                    await redis_cluster_client.exists("test_key")
                    await asyncio.sleep(random.random() / 100)
                await redis_cluster_client.set("test_key", "1")
                await asyncio.sleep(random.random() / 100)
        except Exception as e:
            print(e)


def main():
    loop = asyncio.get_event_loop()
    asyncio.gather(*[worker() for _ in range(30)], loop=loop)
    loop.run_forever()


if __name__ == "__main__":
    main()

And you will get output like this after running for sometime:

'int' object has no attribute 'decode'
'int' object has no attribute 'decode'
'int' object has no attribute 'decode'
'int' object has no attribute 'decode'

In the code above, a very small timeout value (0.05 second) is set, if you can't reproduce the error, you can try to reduce it to a even smaller one.

The exists command will return 0 or 1, and the set command will get the int type value returned for exists command and lead to a error.

In my production code, a worker will execute a job with a timeout limit, if the job timeout, it may cause this error.

maybe it should wait instead of raise an error if reached the max connection limit

I'm now running aredis connecting to a cluster with 12 node, and the client side has more than 50 machines, and about 200 client process, So, the total connection number is very large.

And because we use asyncio, a lot of tasks are running at the same time. When a request peek comes, Too Many Connections error often occurs.

I don't want to increase the connection pool size. We use asyncio so we can make a task sleep and waiting for connections.

It's only a suggestion because a retry can also be done by user insted of aredis library.

Error when using subscribe

When i check the subscribe functionality i get an error
This is the code sample

        client = StrictRedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": 7002}])
        sub = client.pubsub()
        await sub.subscribe("boom")

This is the traceback


ERROR:root:  File "/home/melhin/virtualenvs/aiotest/lib/python3.5/site-packages/aredis/pool.py", line 466, in get_master_node_by_slot
    return self.nodes.slots[slot][0]

On further debugging i found out that
self.nodes.slots[slot][0]
is the on causing the problem , mostly because
self.nodes.slots is empty . Is there a configuration issue with my redis setting ?

pipeline when transaction=True error; but transaction=False pass

Traceback (most recent call last):
File "/home/chaos_session/chaos_session/route.py", line 65, in exec_action
result = await class_obj.exec_action() # 执行对象的exec_action方法d
File "/home/chaos_session/chaos_session/tool/action_base.py", line 136, in exec_action
await self.take_action()
File "/home/chaos_session/chaos_session/tool/action_base.py", line 118, in take_action
await self.do_action()
File "/home/chaos_session/chaos_session/v1_2/game_1010/action20000.py", line 35, in do_action
c_pub=self.c_pub)
File "/home/chaos_session/chaos_session/db_manager/user_info_manager.py", line 50, in add_user
await self.db.insert_one(JWT, user)
File "/home/chaos_session/chaos_session/db_manager/redis_manage.py", line 228, in insert_one
await self._insert_one(wrapper_jwt, data)
File "/home/chaos_session/chaos_session/db_manager/redis_manage.py", line 218, in _insert_one
r = await self.pipe.execute()
File "/home/py3-env/lib/python3.5/site-packages/aredis/pipeline.py", line 286, in execute
return await exec(conn, stack, raise_on_error)
File "/home/py3-env/lib/python3.5/site-packages/aredis/pipeline.py", line 203, in _execute_transaction
if isinstance(response, typing.Awaitable):
AttributeError: module 'typing' has no attribute 'Awaitable'

redis.pubsub().listen() does not work properly

Checklist

  • Python 3.5.1/Python 3.6
  • Python parser
  • event loop
  • 1.1.2

Steps to reproduce

from aredis import StrictRedis
import asyncio


def on_message(message):
    print(message)


async def sub_redis():
    redis = StrictRedis('127.0.0.1', 6379)
    redis_sub = redis.pubsub()
    await redis_sub.subscribe('notify')
    for message in await redis_sub.listen():
        print(message)


if __name__ == '__main__':
    task = sub_redis()
    asyncio.ensure_future(task)
    asyncio.get_event_loop().run_forever()

redis-cli publish notify bye

Expected behavior

with redis-py, it's output is as blow:
{'type': 'subscribe', 'pattern': None, 'channel': b'notify', 'data': 1}
{'type': 'message', 'pattern': None, 'channel': b'notify', 'data': b'bye'}

Actual behavior

with aredis it's output is:

type
pattern
channel
data

and it can't receive follow-up messages

couldn't run aredis with tornado multi-process patterns

Checklist

  • Python version
    3.6
  • Using hiredis or just Python parser
    hiredis
  • Using uvloop or just asyncio event loop
    asyncio
  • Does issue exists against the master branch of aredis?
    no

Steps to reproduce

 run tornado with  multi-process patterns

Expected behavior

 to set and get the value from redis by key with await

Actual behavior

  • It is appreciated if error log can be provided
    not log....there is no any response ....
class MainHandler(tornado.web.RequestHandler):

    def __init__(self, application, request, **kwargs):
        super().__init__(application, request, **kwargs)
        self.redis_client = StrictRedis(host='127.0.0.1', port=6379, db=0)

    async def get(self):
        print("begin")
        ## it didn't run more, is my event loop issue?
        await self. redis_client.set('abc1', 'abcqwewq')
        abc_result = await self.redis_client.get('abc1')
        print(abc_result)
        self.write("Hello, world")


app = tornado.web.Application([(r"/", MainHandler)])
server =tornado.httpserver.HTTPServer(app)
server.bind(9999)
server.start(4)  # Forks multiple sub-processes
tornado.ioloop.IOLoop.current().start()

but .... it's fun ....when aredis run with tornado single patterns
BTW: did aredis has a default connect pool? And does aredis could auto reconnect redis server?


class MainHandler(tornado.web.RequestHandler):

    def __init__(self, application, request, **kwargs):
        super().__init__(application, request, **kwargs)
        print(self.application.pool)
        self.redis_client = self.application.pool["redis_pool"]
    async def get(self):
        print("begin")
        await self. redis_client.set('abc1', 'abcqwewq')
        abc_result = await self.redis_client.get('abc1')
        print(abc_result)
        self.write("Hello, world")

class Application(tornado.web.Application):
    def __init__(self, pool):
        self._pool = pool
        setting = dict(
            debug = False,
            autoreload = False
        )
        super(Application, self).__init__([(r"/", MainHandler)], **setting)
    @property
    def pool(self):
        return self._pool


async def init_pool():
    db_pool = await asyncpg.create_pool(host="127.0.0.1", database="test1", user="test2", password="test3")
    redis_pool = StrictRedis(host='127.0.0.1', port=6379, db=0)
    return {'db_pool':db_pool, 'redis_pool':redis_pool}

if __name__ == "__main__":
    #asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    AsyncIOMainLoop().install()
    loop = asyncio.get_event_loop()
    pool = loop.run_until_complete(init_pool())
    app = Application(pool)
    server = tornado.httpserver.HTTPServer(app)
    server.listen(9999)
    loop.run_forever()

AWS RedisCluster not working / get master node failed

We trying to use aredis with a ElasticCache Redis Cluster, but getting the following error.

[2018-01-08 10:21:28 +0000] [9] [ERROR] Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/sanic/app.py", line 556, in handle_request
    response = await response
  File "/usr/local/lib/python3.6/site-packages/prometheus_async/aio/_decorators.py", line 42, in decorator
    rv = yield from wrapped(*args, **kw)
  File "/menu_service/menu_response.py", line 75, in get_menu_endpoint
    menu = await get_menu_cache(id)
  File "/menu_service/cache.py", line 21, in get_menu_cache
    result = await redis.get(_cache_key(id))
  File "/usr/local/lib/python3.6/site-packages/aredis/commands/strings.py", line 147, in get
    return await self.execute_command('GET', name)
  File "/usr/local/lib/python3.6/site-packages/aredis/utils.py", line 167, in inner
    return await func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/aredis/client.py", line 405, in execute_command
    node = self.connection_pool.get_node_by_slot(slot)
  File "/usr/local/lib/python3.6/site-packages/aredis/pool.py", line 480, in get_node_by_slot
    return self.get_master_node_by_slot(slot)
  File "/usr/local/lib/python3.6/site-packages/aredis/pool.py", line 475, in get_master_node_by_slot
    return self.nodes.slots[slot][0]
KeyError: 232 

Our redis client creation is the following:

startup_nodes = [{"host": REDIS_HOST, "port": REDIS_PORT }]
redis = StrictRedisCluster(startup_nodes=startup_nodes, decode_responses=False, skip_full_coverage_check=True)

We using python 3.6 with sanic 0.7.0.
The setup is working with using the redis-py-cluster==1.3.4.

Connection problems with XREADGROUP

Checklist

  • Python version: 3.6
  • Using hiredis or just Python parser: python parser
  • Using uvloop or just asyncio event loop: asyncio
  • Does issue exists against the master branch of aredis? yes

Steps to reproduce

Connect to a Redis cluster and try to make XREADGROUP commands

Expected behavior

Returns messages from the stream

Actual behavior

First, I was bothered by "TTL exhausted" exceptions, as if aredis wasn't following redirections upon MovedErrors. I came up with a simple fix (which may not be appropried for general case): commenting that line.

Then, the problem became that after a finite number of XREADGROUP commands, aredis raised an exception "Too many connections". For a given max_connections, the number of commands issued before the exception is always the same. Also, this behaviour is specific to xreadgroup, I can xread without problems.

This makes me think that connections are not released properly with XREADGROUP commands, but I can't see why. What would be your diagnosis ?

Here is the traceback I get:

Traceback (most recent call last):
  File "/var/www/metrics-doctor-forwarder/source/forwarder/.venv/lib/python3.6/site-packages/aredis/pool.py", line 501, in get_connection_by_node
    connection = self._available_connections.get(node["name"], []).pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "forwarder/test.py", line 47, in process
    async for msg in stream:
  File "forwarder/test.py", line 35, in stream_listener
    items = await redis_conn.xreadgroup("metrics-doctor-forwarder", "forwarder-single", count=20, status=">")
  File "/var/www/metrics-doctor-forwarder/source/forwarder/.venv/lib/python3.6/site-packages/aredis/commands/streams.py", line 238, in xreadgroup
    return await self.execute_command('XREADGROUP', *pieces)
  File "/var/www/metrics-doctor-forwarder/source/forwarder/.venv/lib/python3.6/site-packages/aredis/utils.py", line 167, in inner
    return await func(*args, **kwargs)
  File "/var/www/metrics-doctor-forwarder/source/forwarder/.venv/lib/python3.6/site-packages/aredis/client.py", line 399, in execute_command
    r = self.connection_pool.get_connection_by_node(node)
  File "/var/www/metrics-doctor-forwarder/source/forwarder/.venv/lib/python3.6/site-packages/aredis/pool.py", line 503, in get_connection_by_node
    connection = self.make_connection(node)
  File "/var/www/metrics-doctor-forwarder/source/forwarder/.venv/lib/python3.6/site-packages/aredis/pool.py", line 394, in make_connection
    raise RedisClusterException("Too many connections")
aredis.exceptions.RedisClusterException: Too many connections

UnicodeDecodeError when receiving pickled response from pubsub

Hello. Thanks for the excellent library. The current Pubsub implementation decodes the whole response object from bytes, if it is a bytes object. This is different from redis-py, which just converts the response type only and then passes the rest of the response through to the client. This poses a problem with objects that are pickle (or any serialization) before they are published to the redis pubsub.

relevant code from areds:
def handle_message(self, response, ignore_subscribe_messages=False): """ Parses a pub/sub message. If the channel or pattern was subscribed to with a message handler, the handler is invoked instead of a parsed message being returned. """ response = [x.decode() if isinstance(x, bytes) else x for x in response]

relevant code from redis-py:
def handle_message(self, response, ignore_subscribe_messages=False): """ Parses a pub/sub message. If the channel or pattern was subscribed to with a message handler, the handler is invoked instead of a parsed message being returned. """ message_type = nativestr(response[0])

the natrivestr implementation is:

def nativestr(x): return x if isinstance(x, str) else x.decode('utf-8', 'replace')

Is this different than the redis-py implementation for a specific reason? It causes a UnitcodeDecodeErrror when an aredis pubsub client instance tries to decode a pickled object I've published. Is their another way you'd recommend I handle this? Thanks!

PubSubWorkerThread._run() fails if stream_timeout is used

If you use stream_timeout=1, the PubSubWorkerThread will fail with TimeoutError exception, since the pubsub.get_message() internally uses parse_response(block=False, timeout=0) (since 0 is the default timeout value, and we can't explicitly pass one), which ends using the stream_timeout=1.

I see two ways of solving this:

  1. try-except TimeoutError: pass around pubsub.get_message() at PubSubWorkerThread._run().
  2. give pubsub.get_message(..., timeout=1), so we use the wait_for() path in parse_response().

I tried both and they work. Let me know your preference and I'll submit the patch (or please fix it, should be 1-2 lines of patch :-D)

Connecting with Redis URL?

How can i connect to databse with such string as redis://h:p5ef9a0cfbef0io972d83933b10d99bc7d5d4a1ec777db341c6b12310f47e2cea@ec2-52-212-239-249.kabab.com:12345?

ERROR sending "cluster slots"

Im getting

ERROR:root:ERROR sending "cluster slots" command to redis server: {'port': 7002, 'host': '127.0.0.1'}

When i try to use the StrictRedisCluster Connection

On further debugging i found that
parse_cluster_slots in commands/cluster is looking for node[2] which somehow doesnt come in the cluster slots command run from the redis-cli

I run cluster slots
127.0.0.1:7001> cluster slots

    1. (integer) 5461
    2. (integer) 10922
      1. "127.0.0.1"
      2. (integer) 7004
      1. "127.0.0.1"
      2. (integer) 7003
    1. (integer) 10923
    2. (integer) 16383
      1. "127.0.0.1"
      2. (integer) 7006
      1. "127.0.0.1"
      2. (integer) 7005
    1. (integer) 0
    2. (integer) 5460
      1. "127.0.0.1"
      2. (integer) 7001
      1. "127.0.0.1"
      2. (integer) 7002

and run cluster nodes
7a3710105e20a9b30d81f636b2b30d27218dc0ac 127.0.0.1:7003 slave 3ee92494550369062d5ff5cbd9b3b5efd60f6c01 0 1494940935358 24 connected
4ace8f1ceb4be131884fe0df0ca50ec576f2909a 127.0.0.1:7002 slave aa10e2d760478d6e7e5a1627379acd508726bf67 0 1494940934858 22 connected
3ee92494550369062d5ff5cbd9b3b5efd60f6c01 127.0.0.1:7004 master - 0 1494940936361 24 connected 5461-10922
41a63b873a2257af9866db482dd35f016f204f93 127.0.0.1:7006 master - 0 1494940936361 21 connected 10923-16383
aa10e2d760478d6e7e5a1627379acd508726bf67 127.0.0.1:7001 myself,master - 0 0 22 connected 0-5460
b00c4d9e1d404b5e35b58a664284fe274e111523 127.0.0.1:7005 slave 41a63b873a2257af9866db482dd35f016f204f93 0 1494940935859 21 connected

Is StrictRedis retry behaviour as expected?

Checklist

  • Python version
    3.6
  • Using hiredis or just Python parser
    Python parser
  • Using uvloop or just asyncio event loop
    Asyncio
  • Does issue exists against the master branch of aredis?
    Yes

Steps to reproduce

aredis/aredis/client.py

Lines 156 to 161 in 395ce42

except (ConnectionError, TimeoutError) as e:
connection.disconnect()
if not connection.retry_on_timeout and isinstance(e, TimeoutError):
raise
await connection.send_command(*args)
return await self.parse_response(connection, command_name, **options)

During debugging another issue, I find that the logic here is a bit strange.
The senario:
2 different types of errors ConnectionError and TimeoutError will be caught here, and from the following if clause, isinstance(TimoutError) is added as a condition, so I am assuming that the expected behavior for retry is:

  1. The user sets the retry_on_timeout to True.
  2. It's a TimeoutError.

Expected behavior

The expect behavior can be interpreted with a truth table(T means do retry, F means not):

TimeoutError\retry true false
true T F
false F F

Actual behavior

The actual behavior can be interpreted with a truth table(T means do retry, F means not):

TimeoutError\retry true false
true T F
false T T

Conclusion

If my understanding of the expected behavior is correct, then we need to fix the problem of retrying on encountering connection error.(I already created a PR for it)

Here is a script for validating the current behavior and behavior after the fix:

# script for validating the current and fixed behavior truth table

do_retry =          [True, True, False, False]
is_timeout_error =  [False,True, True,  False]
zipped = list(zip(do_retry, is_timeout_error))

# current behaviour:
print("----------------------------------------")
print("Current behaviour: ")
print("----------------------------------------")
for do_retry, is_timeout_error in zipped:
    print("Condition: retry-{}, timeout-{}".format(do_retry, is_timeout_error))
    if not do_retry and is_timeout_error:
        print("Result: not retry")
    else:
        print("Result: retry")

# behaviour after fix:
print("----------------------------------------")
print("Fixed behaviour: ")
print("----------------------------------------")
for do_retry, is_timeout_error in zipped:
    print("Condition: retry: {}, timeout: {}".format(do_retry, is_timeout_error))
    if not (do_retry and is_timeout_error):
        print("Result: not retry")
    else:
        print("Result: retry")
print("----------------------------------------")

Script output:

----------------------------------------
Current behaviour:
----------------------------------------
Condition: retry-True, timeout-False
Result: retry
Condition: retry-True, timeout-True
Result: retry
Condition: retry-False, timeout-True
Result: not retry
Condition: retry-False, timeout-False
Result: retry
----------------------------------------
Fixed behaviour:
----------------------------------------
Condition: retry: True, timeout: False
Result: not retry
Condition: retry: True, timeout: True
Result: retry
Condition: retry: False, timeout: True
Result: not retry
Condition: retry: False, timeout: False
Result: not retry
----------------------------------------

UnicodeEncodeError with char '\u0361'

Here is the setup of the main client

self.redis_client = aredis.StrictRedis(host='localhost', decode_responses=True)

This is the statement which the error surrounds.

await self.redis_client.set(f'memecache:{f_search}', f'{link}', ex=86400)

The URL in question is:
http://knowyourmeme.com/memes/hentai-woody-%E5%A4%89%E6%85%8B%E3%82%A6%E3%83%83%E3%83%87%E3%82%A3%E3%83%BC
(the URL is a result of searching the site for the Buzz Lightyear meme and accidentally coming across this which happened to introduce a bug).

Full traceback:

Ignoring exception in command meme:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/discord/ext/commands/core.py", line 62, in wrapped
    ret = yield from coro(*args, **kwargs)
  File "/root/qtbot/cogs/meme.py", line 42, in get_meme_info
    await self.redis_client.set(f'memecache:{f_search}', f'{link}', ex=86400)
  File "/usr/local/lib/python3.6/dist-packages/aredis/commands/strings.py", line 270, in set
    return await self.execute_command('SET', *pieces)
  File "/usr/local/lib/python3.6/dist-packages/aredis/client.py", line 163, in execute_command
    await connection.send_command(*args)
  File "/usr/local/lib/python3.6/dist-packages/aredis/connection.py", line 463, in send_command
    await self.send_packed_command(self.pack_command(*args))
  File "/usr/local/lib/python3.6/dist-packages/aredis/connection.py", line 514, in pack_command
    SYM_CRLF, b(arg), SYM_CRLF))
  File "/usr/local/lib/python3.6/dist-packages/aredis/utils.py", line 8, in b
    return x.encode('latin-1') if not isinstance(x, bytes) else x
UnicodeEncodeError: 'latin-1' codec can't encode character '\u0361' in position 30: ordinal not in range(256)

tmux is weird about scrolling in my TTY, so hopefully that traceback is coherent enough.

Expected release

Hi,

We are planning to use scan_iter from ClusterIterCommandMixin in production.
Any chance to get the master branche released sooner or later ?

Thanks for your work.

Cheers,

Assertion Error Asyncio Socket

Getting these errors on Python 3.6

Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
    self._protocol.data_received(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
    self._stream_reader.feed_data(data)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof

My code

async def pubsub_loop(self):
	self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
	await self.pubsub.subscribe('notsobot.rpc')
	while True:
		message = await self.pubsub.get_message()
		if message:
			load = json.loads(message['data'])
			self.bot.dispatch('rpc_receive', load)
		await asyncio.sleep(0.01)

async def publish(self, payload, snowflake):
	await self.redis.publish('notsobot.rpc', json.dumps({'shard_id': self.bot.shard_id, 'snowflake': snowflake, 'payload': payload}))

Not sure right now if it's my code or the library, the error is vague.
Also, everything is working fine, it's just these errors spam my console.

StrictRedisCluster.from_url ignores skip_full_coverage_check flag

Checklist

  • Python version
    3.x
  • Using hiredis or just Python parser
    hiredis - but doesn't matter.
  • Using uvloop or just asyncio event loop
    asyncio
  • Does issue exists against the master branch of aredis?
    Yes.

Steps to reproduce

client = StrictRedisCluster.from_url(
            "redis://localhost:6379",
            skip_full_coverage_check=True,
        )
await client.get("somekey")

Expected behavior

Should not call CONFIG GET

Actual behavior

  File "/usr/local/lib/python3.7/site-packages/aredis/commands/server.py", line 198, in config_get
    return await self.execute_command('CONFIG GET', pattern)
  File "/usr/local/lib/python3.7/site-packages/aredis/client.py", line 156, in execute_command
    return await self.parse_response(connection, command_name, **options)
  File "/usr/local/lib/python3.7/site-packages/aredis/client.py", line 171, in parse_response
    response = await connection.read_response()
  File "/usr/local/lib/python3.7/site-packages/aredis/connection.py", line 447, in read_response
    raise response
aredis.exceptions.ResponseError: unknown command `CONFIG`, with args beginning with: `GET`, `cluster-require-full-coverage`, ```

call redis.pubsub().get_message() with different timeout, it will crash

Checklist

  • Python 3.6
  • Python parser
  • asyncio event loop
  • 1.1.2

Steps to reproduce

from aredis import StrictRedis
import asyncio

async def sub_redis():
    redis = StrictRedis('172.16.179.111', 6379)
    redis_sub = redis.pubsub()
    await redis_sub.subscribe('test')
    while True:
        message = await redis_sub.get_message(False, 5)
        print(message)
        message = await redis_sub.get_message(False)
        print(message)

async def pub_msg():
    redis = StrictRedis('172.16.179.111', 6379)
    await redis.execute_command('publish', 'test', 'bye')
    await asyncio.sleep(10)
    await redis.execute_command('publish', 'test', 'bye')


if __name__ == '__main__':
    asyncio.ensure_future(sub_redis())
    asyncio.ensure_future(pub_msg())
    asyncio.get_event_loop().run_forever()

the behavior of get_message() is also different form redis-py
when set time_out = 0, redis-py will return immediately, but aredis will never return until it received a message
i don't know whether it is a bug.

Actual behavior

{'type': 'subscribe', 'pattern': None, 'channel': b'test', 'data': 1}
{'type': 'message', 'pattern': None, 'channel': b'test', 'data': b'bye'}
None
Task exception was never retrieved
future: <Task finished coro=<sub_redis() done, defined at G:/svn/PlatformDaily/80-moservice/apipush/aredis_test.py:4> exception=RuntimeError('read() called while another coroutine is already waiting for incoming data',)>
Traceback (most recent call last):
File "G:/svn/PlatformDaily/80-moservice/apipush/aredis_test.py", line 11, in sub_redis
message = await redis_sub.get_message(False)
File "D:\Python\Python36\lib\site-packages\aredis\pubsub.py", line 216, in get_message
response = await self.parse_response(block=False, timeout=timeout)
File "D:\Python\Python36\lib\site-packages\aredis\pubsub.py", line 141, in parse_response
return await coro
File "D:\Python\Python36\lib\site-packages\aredis\pubsub.py", line 111, in _execute
return await command(*args)
File "D:\Python\Python36\lib\site-packages\aredis\connection.py", line 435, in read_response
response = await exec_with_timeout(self._parser.read_response(), self._stream_timeout, loop=self.loop)
File "D:\Python\Python36\lib\site-packages\aredis\connection.py", line 36, in exec_with_timeout
return await asyncio.wait_for(coroutine, timeout, loop=loop)
File "D:\Python\Python36\lib\asyncio\tasks.py", line 339, in wait_for
return (yield from fut)
File "D:\Python\Python36\lib\site-packages\aredis\connection.py", line 201, in read_response
response = await self._buffer.readline()
File "D:\Python\Python36\lib\site-packages\aredis\connection.py", line 102, in readline
await self._read_from_socket()
File "D:\Python\Python36\lib\site-packages\aredis\connection.py", line 62, in _read_from_socket
data = await self._stream.read(self.read_size)
File "D:\Python\Python36\lib\asyncio\streams.py", line 634, in read
yield from self._wait_for_data('read')
File "D:\Python\Python36\lib\asyncio\streams.py", line 452, in _wait_for_data
'already waiting for incoming data' % func_name)

xinfo stream breaks on empty stream

Checklist

  • 3.6.7
  • python parser
  • asyncio loop
  • on Does issue exists against the master branch of aredis?

Steps to reproduce

import asyncio

import aredis

rc = aredis.StrictRedis('localhost',
                        db=0,
                        decode_responses=True)

STREAM_ID = 'mystream'


async def handle_message(res):
    _id, message = res
    await rc.xdel(STREAM_ID, _id)
    print(_id, message)


async def check():
    await rc.xadd(STREAM_ID,
                  {'data': 'testdata'})

    messages = await rc.xrange(STREAM_ID, '-', '+')
    await asyncio.gather(*[handle_message(res) for res in messages])

    stream_exists = await rc.exists(STREAM_ID)
    if stream_exists:
        streaminfo = await rc.xinfo_stream(STREAM_ID)
        print(streaminfo)


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(check(), return_exceptions=False))

Expected behavior

info returned

Actual behavior

Traceback:
aredis/commands/streams.py", line 32, in parse_xinfo_stream
    res['first-entry'][1] = pairs_to_dict(res['first-entry'][1])
TypeError: 'NoneType' object is not subscriptable

redis.pub_sub().get_message() subscribe channel duplicated

Checklist

  • Python 3.6
  • Python parser
  • asyncio event loop
  • 1.1.2

Steps to reproduce

from aredis import StrictRedis
import asyncio

async def sub_redis():
    redis = StrictRedis('172.16.179.100', 6379)
    redis_sub = redis.pubsub()
    await redis_sub.subscribe('notify')
    while True:
        message = await redis_sub.get_message(False, 5)
        print(message)


if __name__ == '__main__':
    asyncio.ensure_future(sub_redis())
    asyncio.get_event_loop().run_forever()

Expected behavior

with redis-py, it only call subscribe once, and receive subscribe notify once

{'type': 'subscribe', 'pattern': None, 'channel': b'notify', 'data': 1}
None
None
None

Actual behavior

{'type': 'subscribe', 'pattern': None, 'channel': b'notify', 'data': 1}
None
{'type': 'subscribe', 'pattern': None, 'channel': b'notify', 'data': 1}
None
{'type': 'subscribe', 'pattern': None, 'channel': b'notify', 'data': 1}

with aredis, it will receive subscribe notify every time when get_message() is called

parsing empty payload of stream message

I've come across similar problems occasionallyL

ERROR:asyncio:Task exception was never retrieved
--
  | future: <Task finished coro=<Subscriber.run() done, defined at /opt/app-root/src/lib/redis/async_subscriber.py:41> exception=TypeError("object of type 'NoneType' has no len()")>
  | Traceback (most recent call last):
  | File "/opt/app-root/src/lib/redis/async_subscriber.py", line 64, in run
  | last_id = await self.process_stream(last_id=last_id)
  | File "/opt/app-root/src/lib/redis/async_subscriber.py", line 160, in process_stream
  | **{self.stream_key: last_id},
  | File "/usr/local/lib/python3.7/site-packages/aredis/commands/streams.py", line 238, in xreadgroup
  | return await self.execute_command('XREADGROUP', *pieces)
  | File "/usr/local/lib/python3.7/site-packages/aredis/client.py", line 156, in execute_command
  | return await self.parse_response(connection, command_name, **options)
  | File "/usr/local/lib/python3.7/site-packages/aredis/client.py", line 174, in parse_response
  | return callback(response, **options)
  | File "/usr/local/lib/python3.7/site-packages/aredis/commands/streams.py", line 22, in multi_stream_list
  | result[r[0]] = stream_list(r[1])
  | File "/usr/local/lib/python3.7/site-packages/aredis/commands/streams.py", line 12, in stream_list
  | while len(kv_pairs) > 1:
  | TypeError: object of type 'NoneType' has no len()

It seems to happen when there's a "(nil)" payload for a given event:

127.0.0.1:6379> XREADGROUP GROUP group_name consumer_name STREAMS stream_name 0
1) 1) "stream_name"
   2) 1) 1) "1561724821836-0"
         2) (nil)
      2) 1) "1562178600447-0"
         2)  1) "stream"
             2) "stream_name"
             3) "topic"
             ...

This seems to happen when an XDEL occurs on a message, but that same message was never XACK. It'd be nice to return an empty dict() in those cases, so the client can clear it from the list but still process the other events pending.

Originally posted by @szelenka in #113 (comment)

aredis pipeline does not work with Python 3.5.1

Checklist

  • Python version 3.5.1
  • Does issue exists against the master branch of aredis? Yes

Steps to reproduce

Use aredis and execute a pipeline with Python 3.5.1.

Expected behavior

No error.

Actual behavior

Traceback (most recent call last):
  File "c:\Users\2\AppData\Local\Programs\Python\Python35-32\Lib\asyncio\tasks.py", line 239, in _step
    result = coro.send(None)
  File "C:\prj\paintman\paintman\socket\async_socket_server.py", line 63, in process_queue
    commands = await async_read_server_command_chunk()
  File "C:\prj\paintman\paintman\socket\redis_protocol.py", line 89, in async_read_server_command_chunk
    res = await p.execute()
  File "C:\prj\paintman\env\lib\site-packages\aredis\pipeline.py", line 286, in execute
    return await exec(conn, stack, raise_on_error)
  File "C:\prj\paintman\env\lib\site-packages\aredis\pipeline.py", line 203, in _execute_transaction
    if isinstance(response, typing.Awaitable):
AttributeError: module 'typing' has no attribute 'Awaitable'

aredis will check being awaitable in this line of code:

if isinstance(response, typing.Awaitable):

and from what I understand, typing.Awaitable is not available in all Python 3.5 versions (source: https://docs.python.org/3.5/whatsnew/changelog.html#id22). You could use inspect package to check an object being awaitable or not:

import inspect
if inspect.isawaitable(response):
    ...

a broken connection will always be returned to pool

If the server side set a timeout value, and the server close a connection because of idle, then ,this broken connection will stay in the pool forever.

and because pop and append is used to implement the pool, the released broken connection will be poped for next request and so on.

Sentinel not working

Sentinel does not work, whilst using redis-cli or blocking redis-py do. Maybe I'm doing something wrong, I noticed that the Sentinel object differs from redis-py one as it does not allow the keyword argument socket_timeout (even being documented as a valid parameter).

  • redis-py (1.0.9)
  • aredis (2.10.6)
  • redis (3.10)

aredis

import asyncio
from aredis import StrictRedis
REDIS_SERVICE_NAME = 'rsmaster'
REDIS_SENTINELS = [
   ('commons-sentinel1', 26379),
   ('commons-sentinel2', 26379),
   ('commons-sentinel3', 26379)
]
# First just do master connection
s = StrictRedis('commons-redis-master', 6379)
l = asyncio.get_event_loop()
l.run_until_complete(s.keys('*'))
>>> [b'test2', b'test1', b'test3', b'test']
# Now do it through Sentinel object
from aredis.sentinel import Sentinel
s = Sentinel(REDIS_SENTINELS)
l.run_until_complete(s.discover_master('rsmaster'))
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-12-93da80304d53> in <module>()
----> 1 l.run_until_complete(s.discover_master('rsmaster')
      2 )

/usr/lib/python3.5/asyncio/base_events.py in run_until_complete(self, future)
    385             raise RuntimeError('Event loop stopped before Future completed.')
    386
--> 387         return future.result()
    388
    389     def stop(self):

/usr/lib/python3.5/asyncio/futures.py in result(self)
    272             self._tb_logger = None
    273         if self._exception is not None:
--> 274             raise self._exception
    275         return self._result
    276

/usr/lib/python3.5/asyncio/tasks.py in _step(***failed resolving arguments***)
    237                 # We use the `send` method directly, because coroutines
    238                 # don't have `__iter__` and `__next__` methods.
--> 239                 result = coro.send(None)
    240             else:
    241                 result = coro.throw(exc)

/usr/lib/python3.5/site-packages/aredis/sentinel.py in discover_master(self, service_name)
    215         for sentinel_no, sentinel in enumerate(self.sentinels):
    216             try:
--> 217                 masters = await sentinel.sentinel_masters()
    218             except (ConnectionError, TimeoutError):
    219                 continue

/usr/lib/python3.5/site-packages/aredis/commands/sentinel.py in sentinel_masters(self)
    107     async def sentinel_masters(self):
    108         "Returns a list of dictionaries containing each master's state."
--> 109         return await self.execute_command('SENTINEL MASTERS')
    110
    111     async def sentinel_monitor(self, name, ip, port, quorum):

/usr/lib/python3.5/site-packages/aredis/client.py in execute_command(self, *args, **options)
    162         try:
    163             await connection.send_command(*args)
--> 164             return await self.parse_response(connection, command_name, **options)
    165         except (ConnectionError, TimeoutError) as e:
    166             connection.disconnect()

/usr/lib/python3.5/site-packages/aredis/client.py in parse_response(self, connection, command_name, **options)
    177         if command_name in self.response_callbacks:
    178             callback = self.response_callbacks[command_name]
--> 179             return callback(response, **options)
    180         return response
    181

/usr/lib/python3.5/site-packages/aredis/commands/sentinel.py in parse_sentinel_masters(response)
     66     result = {}
     67     for item in response:
---> 68         state = parse_sentinel_state(map(str, item))
     69         result[state['name']] = state
     70     return result

/usr/lib/python3.5/site-packages/aredis/commands/sentinel.py in parse_sentinel_state(item)
     49 def parse_sentinel_state(item):
     50     result = pairs_to_dict_typed(item, SENTINEL_STATE_TYPES)
---> 51     flags = set(result['flags'].split(','))
     52     for name, flag in (('is_master', 'master'), ('is_slave', 'slave'),
     53                        ('is_sdown', 's_down'), ('is_odown', 'o_down'),

KeyError: 'flags'

redis-py

from redis.sentinel import Sentinel
REDIS_SERVICE_NAME = 'rsmaster'
REDIS_SENTINELS = [
   ('commons-sentinel1', 26379),
   ('commons-sentinel2', 26379),
   ('commons-sentinel3', 26379)
]
s = Sentinel(REDIS_SENTINELS, socket_timeout=0.1)
s.discover_master(REDIS_SERVICE_NAME)
>>> ('172.18.0.2', 6379)
m = s.master_for(REDIS_SERVICE_NAME)
m.keys('*')
>>>  [b'test2', b'test1', b'test3', b'test']

redis-cli

bash-4.3# redis-cli -h commons-sentinel1 -p 26379
commons-sentinel1:26379> SENTINEL get-master-addr-by-name rsmaster
1) "172.18.0.2"
2) "6379"
commons-sentinel1:26379> SENTINEL slaves rsmaster
1)  1) "name"
    2) "172.18.0.4:6379"
    3) "ip"
    4) "172.18.0.4"
    5) "port"
    6) "6379"
    7) "runid"
    8) "3b57a02fdf6be66085b82226a041a7040cc46ca8"
    9) "flags"
   10) "slave"
   11) "link-pending-commands"
   12) "0"
   13) "link-refcount"
   14) "1"
   15) "last-ping-sent"
   16) "0"
   17) "last-ok-ping-reply"
   18) "699"
   19) "last-ping-reply"
   20) "699"
   21) "down-after-milliseconds"
   22) "5000"
   23) "info-refresh"
   24) "6663"
   25) "role-reported"
   26) "slave"
   27) "role-reported-time"
   28) "1211479"
   29) "master-link-down-time"
   30) "0"
   31) "master-link-status"
   32) "ok"
   33) "master-host"
   34) "172.18.0.2"
   35) "master-port"
   36) "6379"
   37) "slave-priority"
   38) "100"
   39) "slave-repl-offset"
   40) "237851"
2)  1) "name"
    2) "172.18.0.5:6379"
    3) "ip"
    4) "172.18.0.5"
    5) "port"
    6) "6379"
    7) "runid"
    8) "462a4a072ba7e4ded60f65a9b2b520d03c3e5d45"
    9) "flags"
   10) "slave"
   11) "link-pending-commands"
   12) "0"
   13) "link-refcount"
   14) "1"
   15) "last-ping-sent"
   16) "0"
   17) "last-ok-ping-reply"
   18) "699"
   19) "last-ping-reply"
   20) "698"
   21) "down-after-milliseconds"
   22) "5000"
   23) "info-refresh"
   24) "6663"
   25) "role-reported"
   26) "slave"
   27) "role-reported-time"
   28) "1211469"
   29) "master-link-down-time"
   30) "0"
   31) "master-link-status"
   32) "ok"
   33) "master-host"
   34) "172.18.0.2"
   35) "master-port"
   36) "6379"
   37) "slave-priority"
   38) "100"
   39) "slave-repl-offset"
   40) "237851"
commons-sentinel1:26379> SENTINEL sentinels rsmaster
1)  1) "name"
    2) "bcf7e6582e10f9cc70360348c1a529a6a6ad3a10"
    3) "ip"
    4) "172.18.0.8"
    5) "port"
    6) "26379"
    7) "runid"
    8) "bcf7e6582e10f9cc70360348c1a529a6a6ad3a10"
    9) "flags"
   10) "sentinel"
   11) "link-pending-commands"
   12) "0"
   13) "link-refcount"
   14) "1"
   15) "last-ping-sent"
   16) "0"
   17) "last-ok-ping-reply"
   18) "133"
   19) "last-ping-reply"
   20) "133"
   21) "down-after-milliseconds"
   22) "5000"
   23) "last-hello-message"
   24) "1378"
   25) "voted-leader"
   26) "?"
   27) "voted-leader-epoch"
   28) "0"
2)  1) "name"
    2) "985e7c3435b9c1593fdb241d668b123f55481ac8"
    3) "ip"
    4) "172.18.0.7"
    5) "port"
    6) "26379"
    7) "runid"
    8) "985e7c3435b9c1593fdb241d668b123f55481ac8"
    9) "flags"
   10) "sentinel"
   11) "link-pending-commands"
   12) "0"
   13) "link-refcount"
   14) "1"
   15) "last-ping-sent"
   16) "0"
   17) "last-ok-ping-reply"
   18) "133"
   19) "last-ping-reply"
   20) "133"
   21) "down-after-milliseconds"
   22) "5000"
   23) "last-hello-message"
   24) "800"
   25) "voted-leader"
   26) "?"
   27) "voted-leader-epoch"
   28) "0"
commons-sentinel1:26379> SENTINEL sentinels rsmaster
1)  1) "name"
    2) "bcf7e6582e10f9cc70360348c1a529a6a6ad3a10"
    3) "ip"
    4) "172.18.0.8"
    5) "port"
    6) "26379"
    7) "runid"
    8) "bcf7e6582e10f9cc70360348c1a529a6a6ad3a10"
    9) "flags"
   10) "sentinel"
   11) "link-pending-commands"
   12) "0"
   13) "link-refcount"
   14) "1"
   15) "last-ping-sent"
   16) "0"
   17) "last-ok-ping-reply"
   18) "972"
   19) "last-ping-reply"
   20) "972"
   21) "down-after-milliseconds"
   22) "5000"
   23) "last-hello-message"
   24) "1371"
   25) "voted-leader"
   26) "?"
   27) "voted-leader-epoch"
   28) "0"
2)  1) "name"
    2) "985e7c3435b9c1593fdb241d668b123f55481ac8"
    3) "ip"
    4) "172.18.0.7"
    5) "port"
    6) "26379"
    7) "runid"
    8) "985e7c3435b9c1593fdb241d668b123f55481ac8"
    9) "flags"
   10) "sentinel"
   11) "link-pending-commands"
   12) "0"
   13) "link-refcount"
   14) "1"
   15) "last-ping-sent"
   16) "0"
   17) "last-ok-ping-reply"
   18) "972"
   19) "last-ping-reply"
   20) "972"
   21) "down-after-milliseconds"
   22) "5000"
   23) "last-hello-message"
   24) "573"
   25) "voted-leader"
   26) "?"
   27) "voted-leader-epoch"
   28) "0"

PubSub get_message() not always returns None when there're no data available

Hello Jason,

Looks like PubSub get_message() function is not working as expected, on just created channel without no data. I assume that call of this method should return None in that case, however None returns only once and then execution "hangs" and awaits for coroutine to be completed (actual data appears in the channel) and then execution continues.

Here are the steps to reproduce this issue:

# 1. Create some subscription (PubSub obj)

# redis — initialized connection to some Redis instance
pbsb = redis.pubsub(ignore_subscribe_messages=True)
await pbsb.subscribe("some-channel")

# 2. Query message in the loop
timeout = 5
message = None
end_time = time.time() + timeout
while timeout == 0 or time.time() < end_time:
    print(1)
    message = await pbsb.get_message(timeout=1)
    print(2)

# 3. results
>>> 1
>>> 2
>>> 1
# these should be a little more 1 and 2 :)

# 4. Nothing more happened until some data arrives, then
>>> 2

I've attached a patch
addressed to resolve this issue, it removes connection.can_read() check in aredis/pubsub.py on line 132, because in that case it always returns False and adds several imports:

from aredis.exceptions import ConnectionError
from aredis.exceptions import TimeoutError

Looks like without them except on line 112 is not working as expected.

Also in aredis/connection.py on line 307 I've added check

if not self._reader:
    break

Because when coroutine actually cancelled, self._reader is getting None and this is leads to unwanted exception.

Best,
Jack

Distributed lock for Redis Cluster

Hi!

It would be very useful to have distributed lock support for the cluster in your awesome library. It's very common to have Redis cluster on production and Redis-related logic is just broken with >1 instance of consumer, for example. None of current drivers have such a feature.

Thank you.

TSL connection to AWS redis cluster

Checklist

  • Python version
    3.6.7
  • Using hiredis or just Python parser
    Both
  • Using uvloop or just asyncio event loop
    Asyncio
  • Does issue exists against the master branch of aredis?
    Yes

Steps to reproduce

import asyncio
from aredis import StrictRedisCluster, StrictRedis
from aredis import ClusterConnectionPool

import settings
async def test_connection():
  pool = ClusterConnectionPool.from_url('rediss://xxx.cache.amazonaws.com:6379', decode_components=False)
  redis = StrictRedisCluster(connection_pool=pool, decode_responses=False, skip_full_coverage_check=True)
  await redis.get('a')

loop = asyncio.get_event_loop()
loop.run_until_complete(test_connection())

Expected behavior

Connection goes through

Actual behavior

Traceback (most recent call last):
  File "test.py", line 12, in <module>
    loop.run_until_complete(try_hard())
  File "/usr/local/lib/python3.6/asyncio/base_events.py", line 473, in run_until_complete
    return future.result()
  File "test.py", line 9, in try_hard
    await redis.get('a')
  File "/usr/local/lib/python3.6/site-packages/aredis/commands/strings.py", line 147, in get
    return await self.execute_command('GET', name)
  File "/usr/local/lib/python3.6/site-packages/aredis/utils.py", line 167, in inner
    return await func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/aredis/client.py", line 356, in execute_command
    await self.connection_pool.initialize()
  File "/usr/local/lib/python3.6/site-packages/aredis/pool.py", line 299, in initialize
    await self.nodes.initialize()
  File "/usr/local/lib/python3.6/site-packages/aredis/nodemanager.py", line 189, in initialize
    raise RedisClusterException('Redis Cluster cannot be connected. '
aredis.exceptions.RedisClusterException: Redis Cluster cannot be connected. Please provide at least one reachable node.

Moved error

I am using StrictRedisCluster.

I am getting a bunch of MovedErrors. Is there some setting I need to set to allow for redirections upon cluster movements?

aredis fails to install using pip

aredis is failing to install in a new environment. Trying to install it results in the following error:

    Complete output from command python setup.py egg_info:
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/tmp/pip-build-ztxqbwm1/aredis/setup.py", line 5, in <module>
        from aredis import __version__
      File "/tmp/pip-build-ztxqbwm1/aredis/aredis/__init__.py", line 1, in <module>
        from aredis.client import StrictRedis
      File "/tmp/pip-build-ztxqbwm1/aredis/aredis/client.py", line 10, in <module>
        from aredis.commands.cluster import ClusterCommandMixin
    ImportError: No module named 'aredis.commands'
    
    ----------------------------------------

The reason seems to be that setup.py is loading the version from init, this results in trying to load the whole package, which fails as the package is not installed yet.

Pip version 9.0.1, aredis version 1.0.4

feature request

Checklist

  • Python version
    3.6
  • Using hiredis or just Python parser
    python parser only
  • Using uvloop or just asyncio event loop
    asyncio event loop
  • Does issue exists against the master branch of aredis?

Steps to reproduce

Expected behavior

to get a result is not None with callback

Actual behavior

  • It is appreciated if error log can be provided
    is that any possible to add a feature to get the data from redis with special key when the data is not None with a callback? my situation is in tornado to use celery run the background job , but i have to relay on the celery run result , but i am doing is to get the result if is none , use tornado sleep to wait .
# the demo code is something like this 
        while not result:
            await tornado.gen.sleep(0.2)
            result = await self.application.redis_client.get(prefix + task.id)

but it do decrese the performance of my web app? so is that any possible way to get the result with callback ?

TSL connection to AWS redis cluster

Checklist

  • Python version
    3.6.3
  • Using hiredis or just Python parser
    hiredis
  • Using uvloop or just asyncio event loop
    asyncio
  • Does issue exists against the master branch of aredis?
    both

Steps to reproduce

#!/usr/bin/python

import asyncio
import logging
import os
from aredis import StrictRedisCluster
from aredis import ClusterConnectionPool

async def example():
    pool = ClusterConnectionPool.from_url(os.getenv('REDIS_CLUSTER_URL'))
    redis = StrictRedisCluster(connection_pool=pool, decode_responses=False, skip_full_coverage_check=True)
    logging.info(redis.connection_pool.initialized)
    logging.info(redis.connection_pool.nodes.slots.keys())
    await redis.connection_pool.initialize()
    logging.info(redis.connection_pool.initialized)
    logging.info(redis.connection_pool.nodes.slots[15495])
    logging.info(await redis.get('a'))

if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(example())
export REDIS_CLUSTER_URL=rediss://:###@###.cache.amazonaws.com:7000/0

Expected behavior

Connects and does not raise exception

Actual behavior

INFO:root:False
INFO:root:dict_keys([])
Traceback (most recent call last):
  File "/usr/lib/python3.6/site-packages/aredis/nodemanager.py", line 123, in initialize
    r = self.get_redis_link(host=node['host'], port=node['port'])
  File "/usr/lib/python3.6/site-packages/aredis/nodemanager.py", line 98, in get_redis_link
    return StrictRedis(host=host, port=port, decode_responses=True, **connection_kwargs)
TypeError: __init__() got an unexpected keyword argument 'ssl_context'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "debug_redis.py", line 24, in <module>
    loop.run_until_complete(example())
  File "/usr/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
    return future.result()
  File "debug_redis.py", line 15, in example
    await redis.connection_pool.initialize()
  File "/usr/lib/python3.6/site-packages/aredis/pool.py", line 299, in initialize
    await self.nodes.initialize()
  File "/usr/lib/python3.6/site-packages/aredis/nodemanager.py", line 129, in initialize
    raise RedisClusterException('ERROR sending "cluster slots" command to redis server: {0}'.format(node))
aredis.exceptions.RedisClusterException: ERROR sending "cluster slots" command to redis server: {'host': '###.cache.amazonaws.com', 'port': '###'}

How to use as few connections as possible in redis cluster

Checklist

  • Python version python3.7
  • Using hiredis or just Python parser
  • Using uvloop or just asyncio event loop
  • Does issue exists against the master branch of aredis?

Steps to reproduce

Expected behavior

Actual behavior

  • It is appreciated if error log can be provided

Too Many connections
This question has really been bothering me
And I tried to set the connections-config of redis-cluster-server
And set ClusterConnectionPool max_connections.
With high concurrency, the number of sockets continues to increase, and sockets will not be released.
Can I configure connections to reuse as much as possible?
And Automatic recovery and release when idle

Support for "CLIENT REPLY ON | OFF | SKIP"

command
I am not sure whether i should add support for the new command CLIENT REPLY(which need to share one connection between different commands execution, may lead change on execute_command) introduced since 3.2 release.

How do you think about that?

tornado with aredis issue

Checklist

  • Python version
    3.6
  • Using hiredis or just Python parser
    python pareser
  • Using uvloop or just asyncio event loop
    asyncio event loop
  • Does issue exists against the master branch of aredis?
    no

Steps to reproduce

run tornado with routerconfig

Expected behavior

to get the value from redis by key with await

Actual behavior

  • It is appreciated if error log can be provided
    my tornado with routerconfig init your redis aio instance can't get the value of key ,and it get stop for this single request

Exception when scan_iter() with StrictRedisCluster

Checklist

  • Python version
    3.6
  • Using hiredis or just Python parser
    Python parser
  • Using uvloop or just asyncio event loop
    Asyncio
  • Does issue exists against the master branch of aredis?
    Yes

Steps to reproduce

In this sample, scan_iter with StrictRedisCluster would raise exception

import asyncio
from aredis import StrictRedisCluster

async def example():
    client = StrictRedisCluster(startup_nodes=settings.REDIS_CLUSTER, skip_full_coverage_check=True)
    keys = client.scan_iter()
    async for key in keys:
        print(key)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(example())

Expected behavior

Redis keys are iterated and returned

Actual behavior

Traceback (most recent call last):
  File "sample.py", line 24, in <module>
    loop.run_until_complete(example())
  File "/usr/local/lib/python3.6/asyncio/base_events.py", line 468, in run_until_complete
    return future.result()
  File "sample.py", line 16, in example
    async for key in keys:
  File "/usr/local/lib/python3.6/site-packages/aredis/commands/iter.py", line 23, in scan_iter
    cursor, data = await self.scan(cursor=cursor, match=match, count=count)
ValueError: not enough values to unpack (expected 2, got 1)

RuntimeWarning: coroutine 'PubSub.on_connect' was never awaited

Hello! i'm having a trouble implementing pub/sub with server sent events.
and i got

/home/vagrant/.pyenv/versions/3.6.1/lib/python3.6/asyncio/coroutines.py:109: RuntimeWarning: coroutine 'PubSub.on_connect' was never awaited
  return self.gen.send(None)

my stream view

@app.route("/notifications")
async def notification(request):
    async def _stream(res):
        redis = aredis.StrictRedis()
        pub = redis.pubsub()
        await pub.subscribe('test')
        while True:
            message = await pub.get_message()
            if message:
                res.write(message)
            asyncio.sleep(0.001)
    return stream(_stream)

Any thoughts?

Do I have to call flushdb before using?

I have noticed that await client.flushdb() is called after creating the client object, do I have to call this function each time before using?

>>> async def example():
>>>      client = StrictRedis(host='127.0.0.1', port=6379, db=0)
>>>      await client.flushdb()
>>>      await client.set('foo', 1)
>>>      assert await client.exists('foo') is True
>>>      await client.incr('foo', 100)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.