nonegg / aredis Goto Github PK
View Code? Open in Web Editor NEWredis client for Python asyncio (has support for redis server, sentinel and cluster)
Home Page: http://aredis.readthedocs.io/en/latest/
License: MIT License
redis client for Python asyncio (has support for redis server, sentinel and cluster)
Home Page: http://aredis.readthedocs.io/en/latest/
License: MIT License
In some case, if a command was interrupted by outer exceptions(such as timeout), the connection will be released to the pool containing unconsumed data. The unconsumed data will be read by the next command and thus lead to mistake.
The following code can reproduce this situation (it use async_timeout library from aio-libs):
# coding:utf-8
from aredis import StrictRedisCluster
import asyncio
import random
import async_timeout
import time
redis_cluster_client = StrictRedisCluster(
startup_nodes=[{"host": "127.0.0.1", "port": 7001}],
max_connections=40, max_connections_per_node=True)
async def worker():
while(1):
try:
with async_timeout.timeout(0.05):
for _ in range(2):
await redis_cluster_client.exists("test_key")
await asyncio.sleep(random.random() / 100)
await redis_cluster_client.set("test_key", "1")
await asyncio.sleep(random.random() / 100)
except Exception as e:
print(e)
def main():
loop = asyncio.get_event_loop()
asyncio.gather(*[worker() for _ in range(30)], loop=loop)
loop.run_forever()
if __name__ == "__main__":
main()
And you will get output like this after running for sometime:
'int' object has no attribute 'decode'
'int' object has no attribute 'decode'
'int' object has no attribute 'decode'
'int' object has no attribute 'decode'
In the code above, a very small timeout value (0.05 second) is set, if you can't reproduce the error, you can try to reduce it to a even smaller one.
The exists
command will return 0 or 1, and the set
command will get the int
type value returned for exists
command and lead to a error.
In my production code, a worker will execute a job with a timeout limit, if the job timeout, it may cause this error.
I find a interesting java package called redisson
It provide distributed data structure for users.
Should i add a similar module in aredis like that?
Please tell me your opinion about the idea 0- 0
I'm now running aredis connecting to a cluster with 12 node, and the client side has more than 50 machines, and about 200 client process, So, the total connection number is very large.
And because we use asyncio, a lot of tasks are running at the same time. When a request peek comes, Too Many Connections
error often occurs.
I don't want to increase the connection pool size. We use asyncio so we can make a task sleep and waiting for connections.
It's only a suggestion because a retry can also be done by user insted of aredis library.
When i check the subscribe functionality i get an error
This is the code sample
client = StrictRedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": 7002}])
sub = client.pubsub()
await sub.subscribe("boom")
This is the traceback
ERROR:root: File "/home/melhin/virtualenvs/aiotest/lib/python3.5/site-packages/aredis/pool.py", line 466, in get_master_node_by_slot
return self.nodes.slots[slot][0]
On further debugging i found out that
self.nodes.slots[slot][0]
is the on causing the problem , mostly because
self.nodes.slots is empty . Is there a configuration issue with my redis setting ?
Traceback (most recent call last):
File "/home/chaos_session/chaos_session/route.py", line 65, in exec_action
result = await class_obj.exec_action() # 执行对象的exec_action方法d
File "/home/chaos_session/chaos_session/tool/action_base.py", line 136, in exec_action
await self.take_action()
File "/home/chaos_session/chaos_session/tool/action_base.py", line 118, in take_action
await self.do_action()
File "/home/chaos_session/chaos_session/v1_2/game_1010/action20000.py", line 35, in do_action
c_pub=self.c_pub)
File "/home/chaos_session/chaos_session/db_manager/user_info_manager.py", line 50, in add_user
await self.db.insert_one(JWT, user)
File "/home/chaos_session/chaos_session/db_manager/redis_manage.py", line 228, in insert_one
await self._insert_one(wrapper_jwt, data)
File "/home/chaos_session/chaos_session/db_manager/redis_manage.py", line 218, in _insert_one
r = await self.pipe.execute()
File "/home/py3-env/lib/python3.5/site-packages/aredis/pipeline.py", line 286, in execute
return await exec(conn, stack, raise_on_error)
File "/home/py3-env/lib/python3.5/site-packages/aredis/pipeline.py", line 203, in _execute_transaction
if isinstance(response, typing.Awaitable):
AttributeError: module 'typing' has no attribute 'Awaitable'
from aredis import StrictRedis
import asyncio
def on_message(message):
print(message)
async def sub_redis():
redis = StrictRedis('127.0.0.1', 6379)
redis_sub = redis.pubsub()
await redis_sub.subscribe('notify')
for message in await redis_sub.listen():
print(message)
if __name__ == '__main__':
task = sub_redis()
asyncio.ensure_future(task)
asyncio.get_event_loop().run_forever()
redis-cli publish notify bye
with redis-py, it's output is as blow:
{'type': 'subscribe', 'pattern': None, 'channel': b'notify', 'data': 1}
{'type': 'message', 'pattern': None, 'channel': b'notify', 'data': b'bye'}
with aredis it's output is:
type
pattern
channel
data
and it can't receive follow-up messages
master
branch of aredis? run tornado with multi-process patterns
to set and get the value from redis by key with await
class MainHandler(tornado.web.RequestHandler):
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self.redis_client = StrictRedis(host='127.0.0.1', port=6379, db=0)
async def get(self):
print("begin")
## it didn't run more, is my event loop issue?
await self. redis_client.set('abc1', 'abcqwewq')
abc_result = await self.redis_client.get('abc1')
print(abc_result)
self.write("Hello, world")
app = tornado.web.Application([(r"/", MainHandler)])
server =tornado.httpserver.HTTPServer(app)
server.bind(9999)
server.start(4) # Forks multiple sub-processes
tornado.ioloop.IOLoop.current().start()
but .... it's fun ....when aredis run with tornado single patterns
BTW: did aredis has a default connect pool? And does aredis could auto reconnect redis server?
class MainHandler(tornado.web.RequestHandler):
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
print(self.application.pool)
self.redis_client = self.application.pool["redis_pool"]
async def get(self):
print("begin")
await self. redis_client.set('abc1', 'abcqwewq')
abc_result = await self.redis_client.get('abc1')
print(abc_result)
self.write("Hello, world")
class Application(tornado.web.Application):
def __init__(self, pool):
self._pool = pool
setting = dict(
debug = False,
autoreload = False
)
super(Application, self).__init__([(r"/", MainHandler)], **setting)
@property
def pool(self):
return self._pool
async def init_pool():
db_pool = await asyncpg.create_pool(host="127.0.0.1", database="test1", user="test2", password="test3")
redis_pool = StrictRedis(host='127.0.0.1', port=6379, db=0)
return {'db_pool':db_pool, 'redis_pool':redis_pool}
if __name__ == "__main__":
#asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
AsyncIOMainLoop().install()
loop = asyncio.get_event_loop()
pool = loop.run_until_complete(init_pool())
app = Application(pool)
server = tornado.httpserver.HTTPServer(app)
server.listen(9999)
loop.run_forever()
We trying to use aredis with a ElasticCache Redis Cluster, but getting the following error.
[2018-01-08 10:21:28 +0000] [9] [ERROR] Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/sanic/app.py", line 556, in handle_request
response = await response
File "/usr/local/lib/python3.6/site-packages/prometheus_async/aio/_decorators.py", line 42, in decorator
rv = yield from wrapped(*args, **kw)
File "/menu_service/menu_response.py", line 75, in get_menu_endpoint
menu = await get_menu_cache(id)
File "/menu_service/cache.py", line 21, in get_menu_cache
result = await redis.get(_cache_key(id))
File "/usr/local/lib/python3.6/site-packages/aredis/commands/strings.py", line 147, in get
return await self.execute_command('GET', name)
File "/usr/local/lib/python3.6/site-packages/aredis/utils.py", line 167, in inner
return await func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/aredis/client.py", line 405, in execute_command
node = self.connection_pool.get_node_by_slot(slot)
File "/usr/local/lib/python3.6/site-packages/aredis/pool.py", line 480, in get_node_by_slot
return self.get_master_node_by_slot(slot)
File "/usr/local/lib/python3.6/site-packages/aredis/pool.py", line 475, in get_master_node_by_slot
return self.nodes.slots[slot][0]
KeyError: 232
Our redis client creation is the following:
startup_nodes = [{"host": REDIS_HOST, "port": REDIS_PORT }]
redis = StrictRedisCluster(startup_nodes=startup_nodes, decode_responses=False, skip_full_coverage_check=True)
We using python 3.6 with sanic 0.7.0.
The setup is working with using the redis-py-cluster==1.3.4.
in pool.py, get_random_connection() try to use random.choice() on a dict. It is wrong.
Not sure if this already exists but I could not find support for cluster connections where I can enable requests to go to the replica nodes. This is available in the synchronous client as documented here: https://redis-py-cluster.readthedocs.io/en/master/readonly-mode.html
Wondering if there are plans for adding support for a readonly mode.
i use xreadgroup
get a dict
type
but when i use xclaim
i get a type same as redis-native --get a list
type,
so do you solve it ?
master
branch of aredis? yesConnect to a Redis cluster and try to make XREADGROUP commands
Returns messages from the stream
First, I was bothered by "TTL exhausted" exceptions, as if aredis wasn't following redirections upon MovedErrors. I came up with a simple fix (which may not be appropried for general case): commenting that line.
Then, the problem became that after a finite number of XREADGROUP commands, aredis raised an exception "Too many connections". For a given max_connections, the number of commands issued before the exception is always the same. Also, this behaviour is specific to xreadgroup, I can xread without problems.
This makes me think that connections are not released properly with XREADGROUP commands, but I can't see why. What would be your diagnosis ?
Here is the traceback I get:
Traceback (most recent call last):
File "/var/www/metrics-doctor-forwarder/source/forwarder/.venv/lib/python3.6/site-packages/aredis/pool.py", line 501, in get_connection_by_node
connection = self._available_connections.get(node["name"], []).pop()
IndexError: pop from empty list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "forwarder/test.py", line 47, in process
async for msg in stream:
File "forwarder/test.py", line 35, in stream_listener
items = await redis_conn.xreadgroup("metrics-doctor-forwarder", "forwarder-single", count=20, status=">")
File "/var/www/metrics-doctor-forwarder/source/forwarder/.venv/lib/python3.6/site-packages/aredis/commands/streams.py", line 238, in xreadgroup
return await self.execute_command('XREADGROUP', *pieces)
File "/var/www/metrics-doctor-forwarder/source/forwarder/.venv/lib/python3.6/site-packages/aredis/utils.py", line 167, in inner
return await func(*args, **kwargs)
File "/var/www/metrics-doctor-forwarder/source/forwarder/.venv/lib/python3.6/site-packages/aredis/client.py", line 399, in execute_command
r = self.connection_pool.get_connection_by_node(node)
File "/var/www/metrics-doctor-forwarder/source/forwarder/.venv/lib/python3.6/site-packages/aredis/pool.py", line 503, in get_connection_by_node
connection = self.make_connection(node)
File "/var/www/metrics-doctor-forwarder/source/forwarder/.venv/lib/python3.6/site-packages/aredis/pool.py", line 394, in make_connection
raise RedisClusterException("Too many connections")
aredis.exceptions.RedisClusterException: Too many connections
Hello. Thanks for the excellent library. The current Pubsub implementation decodes the whole response object from bytes, if it is a bytes object. This is different from redis-py, which just converts the response type only and then passes the rest of the response through to the client. This poses a problem with objects that are pickle (or any serialization) before they are published to the redis pubsub.
relevant code from areds:
def handle_message(self, response, ignore_subscribe_messages=False): """ Parses a pub/sub message. If the channel or pattern was subscribed to with a message handler, the handler is invoked instead of a parsed message being returned. """ response = [x.decode() if isinstance(x, bytes) else x for x in response]
relevant code from redis-py:
def handle_message(self, response, ignore_subscribe_messages=False): """ Parses a pub/sub message. If the channel or pattern was subscribed to with a message handler, the handler is invoked instead of a parsed message being returned. """ message_type = nativestr(response[0])
the natrivestr implementation is:
def nativestr(x): return x if isinstance(x, str) else x.decode('utf-8', 'replace')
Is this different than the redis-py implementation for a specific reason? It causes a UnitcodeDecodeErrror when an aredis pubsub client instance tries to decode a pickled object I've published. Is their another way you'd recommend I handle this? Thanks!
If you use stream_timeout=1
, the PubSubWorkerThread
will fail with TimeoutError
exception, since the pubsub.get_message()
internally uses parse_response(block=False, timeout=0)
(since 0 is the default timeout value, and we can't explicitly pass one), which ends using the stream_timeout=1
.
I see two ways of solving this:
try-except TimeoutError: pass
around pubsub.get_message()
at PubSubWorkerThread._run()
.pubsub.get_message(..., timeout=1)
, so we use the wait_for()
path in parse_response()
.I tried both and they work. Let me know your preference and I'll submit the patch (or please fix it, should be 1-2 lines of patch :-D)
How can i connect to databse with such string as redis://h:p5ef9a0cfbef0io972d83933b10d99bc7d5d4a1ec777db341c6b12310f47e2cea@ec2-52-212-239-249.kabab.com:12345
?
Im getting
ERROR:root:ERROR sending "cluster slots" command to redis server: {'port': 7002, 'host': '127.0.0.1'}
When i try to use the StrictRedisCluster Connection
On further debugging i found that
parse_cluster_slots in commands/cluster is looking for node[2] which somehow doesnt come in the cluster slots command run from the redis-cli
I run cluster slots
127.0.0.1:7001> cluster slots
and run cluster nodes
7a3710105e20a9b30d81f636b2b30d27218dc0ac 127.0.0.1:7003 slave 3ee92494550369062d5ff5cbd9b3b5efd60f6c01 0 1494940935358 24 connected
4ace8f1ceb4be131884fe0df0ca50ec576f2909a 127.0.0.1:7002 slave aa10e2d760478d6e7e5a1627379acd508726bf67 0 1494940934858 22 connected
3ee92494550369062d5ff5cbd9b3b5efd60f6c01 127.0.0.1:7004 master - 0 1494940936361 24 connected 5461-10922
41a63b873a2257af9866db482dd35f016f204f93 127.0.0.1:7006 master - 0 1494940936361 21 connected 10923-16383
aa10e2d760478d6e7e5a1627379acd508726bf67 127.0.0.1:7001 myself,master - 0 0 22 connected 0-5460
b00c4d9e1d404b5e35b58a664284fe274e111523 127.0.0.1:7005 slave 41a63b873a2257af9866db482dd35f016f204f93 0 1494940935859 21 connected
Hello everyone!
Could the API reference page on readthedocs include the real description the API? At the moment it has no content - only modules hedlines.
Sincerely
Arek
Hello, we encountered an issue similar to #110, and would like to use the max_idle_time
parameter. I was wondering when we could cut a release with this feature?
master
branch of aredis?Lines 156 to 161 in 395ce42
The expect behavior can be interpreted with a truth table(T means do retry, F means not):
TimeoutError\retry | true | false |
---|---|---|
true | T | F |
false | F | F |
The actual behavior can be interpreted with a truth table(T means do retry, F means not):
TimeoutError\retry | true | false |
---|---|---|
true | T | F |
false | T | T |
If my understanding of the expected behavior is correct, then we need to fix the problem of retrying on encountering connection error.(I already created a PR for it)
Here is a script for validating the current behavior and behavior after the fix:
# script for validating the current and fixed behavior truth table
do_retry = [True, True, False, False]
is_timeout_error = [False,True, True, False]
zipped = list(zip(do_retry, is_timeout_error))
# current behaviour:
print("----------------------------------------")
print("Current behaviour: ")
print("----------------------------------------")
for do_retry, is_timeout_error in zipped:
print("Condition: retry-{}, timeout-{}".format(do_retry, is_timeout_error))
if not do_retry and is_timeout_error:
print("Result: not retry")
else:
print("Result: retry")
# behaviour after fix:
print("----------------------------------------")
print("Fixed behaviour: ")
print("----------------------------------------")
for do_retry, is_timeout_error in zipped:
print("Condition: retry: {}, timeout: {}".format(do_retry, is_timeout_error))
if not (do_retry and is_timeout_error):
print("Result: not retry")
else:
print("Result: retry")
print("----------------------------------------")
Script output:
----------------------------------------
Current behaviour:
----------------------------------------
Condition: retry-True, timeout-False
Result: retry
Condition: retry-True, timeout-True
Result: retry
Condition: retry-False, timeout-True
Result: not retry
Condition: retry-False, timeout-False
Result: retry
----------------------------------------
Fixed behaviour:
----------------------------------------
Condition: retry: True, timeout: False
Result: not retry
Condition: retry: True, timeout: True
Result: retry
Condition: retry: False, timeout: True
Result: not retry
Condition: retry: False, timeout: False
Result: not retry
----------------------------------------
Here is the setup of the main client
self.redis_client = aredis.StrictRedis(host='localhost', decode_responses=True)
This is the statement which the error surrounds.
await self.redis_client.set(f'memecache:{f_search}', f'{link}', ex=86400)
The URL in question is:
http://knowyourmeme.com/memes/hentai-woody-%E5%A4%89%E6%85%8B%E3%82%A6%E3%83%83%E3%83%87%E3%82%A3%E3%83%BC
(the URL is a result of searching the site for the Buzz Lightyear meme and accidentally coming across this which happened to introduce a bug).
Full traceback:
Ignoring exception in command meme:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/discord/ext/commands/core.py", line 62, in wrapped
ret = yield from coro(*args, **kwargs)
File "/root/qtbot/cogs/meme.py", line 42, in get_meme_info
await self.redis_client.set(f'memecache:{f_search}', f'{link}', ex=86400)
File "/usr/local/lib/python3.6/dist-packages/aredis/commands/strings.py", line 270, in set
return await self.execute_command('SET', *pieces)
File "/usr/local/lib/python3.6/dist-packages/aredis/client.py", line 163, in execute_command
await connection.send_command(*args)
File "/usr/local/lib/python3.6/dist-packages/aredis/connection.py", line 463, in send_command
await self.send_packed_command(self.pack_command(*args))
File "/usr/local/lib/python3.6/dist-packages/aredis/connection.py", line 514, in pack_command
SYM_CRLF, b(arg), SYM_CRLF))
File "/usr/local/lib/python3.6/dist-packages/aredis/utils.py", line 8, in b
return x.encode('latin-1') if not isinstance(x, bytes) else x
UnicodeEncodeError: 'latin-1' codec can't encode character '\u0361' in position 30: ordinal not in range(256)
tmux is weird about scrolling in my TTY, so hopefully that traceback is coherent enough.
Hi,
We are planning to use scan_iter
from ClusterIterCommandMixin
in production.
Any chance to get the master branche released sooner or later ?
Thanks for your work.
Cheers,
Getting these errors on Python 3.6
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 731, in _read_ready
self._protocol.data_received(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 253, in data_received
self._stream_reader.feed_data(data)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 413, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
My code
async def pubsub_loop(self):
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
await self.pubsub.subscribe('notsobot.rpc')
while True:
message = await self.pubsub.get_message()
if message:
load = json.loads(message['data'])
self.bot.dispatch('rpc_receive', load)
await asyncio.sleep(0.01)
async def publish(self, payload, snowflake):
await self.redis.publish('notsobot.rpc', json.dumps({'shard_id': self.bot.shard_id, 'snowflake': snowflake, 'payload': payload}))
Not sure right now if it's my code or the library, the error is vague.
Also, everything is working fine, it's just these errors spam my console.
master
branch of aredis?client = StrictRedisCluster.from_url(
"redis://localhost:6379",
skip_full_coverage_check=True,
)
await client.get("somekey")
Should not call CONFIG GET
File "/usr/local/lib/python3.7/site-packages/aredis/commands/server.py", line 198, in config_get
return await self.execute_command('CONFIG GET', pattern)
File "/usr/local/lib/python3.7/site-packages/aredis/client.py", line 156, in execute_command
return await self.parse_response(connection, command_name, **options)
File "/usr/local/lib/python3.7/site-packages/aredis/client.py", line 171, in parse_response
response = await connection.read_response()
File "/usr/local/lib/python3.7/site-packages/aredis/connection.py", line 447, in read_response
raise response
aredis.exceptions.ResponseError: unknown command `CONFIG`, with args beginning with: `GET`, `cluster-require-full-coverage`, ```
^ Title.
from aredis import StrictRedis
import asyncio
async def sub_redis():
redis = StrictRedis('172.16.179.111', 6379)
redis_sub = redis.pubsub()
await redis_sub.subscribe('test')
while True:
message = await redis_sub.get_message(False, 5)
print(message)
message = await redis_sub.get_message(False)
print(message)
async def pub_msg():
redis = StrictRedis('172.16.179.111', 6379)
await redis.execute_command('publish', 'test', 'bye')
await asyncio.sleep(10)
await redis.execute_command('publish', 'test', 'bye')
if __name__ == '__main__':
asyncio.ensure_future(sub_redis())
asyncio.ensure_future(pub_msg())
asyncio.get_event_loop().run_forever()
the behavior of get_message() is also different form redis-py
when set time_out = 0, redis-py will return immediately, but aredis will never return until it received a message
i don't know whether it is a bug.
{'type': 'subscribe', 'pattern': None, 'channel': b'test', 'data': 1}
{'type': 'message', 'pattern': None, 'channel': b'test', 'data': b'bye'}
None
Task exception was never retrieved
future: <Task finished coro=<sub_redis() done, defined at G:/svn/PlatformDaily/80-moservice/apipush/aredis_test.py:4> exception=RuntimeError('read() called while another coroutine is already waiting for incoming data',)>
Traceback (most recent call last):
File "G:/svn/PlatformDaily/80-moservice/apipush/aredis_test.py", line 11, in sub_redis
message = await redis_sub.get_message(False)
File "D:\Python\Python36\lib\site-packages\aredis\pubsub.py", line 216, in get_message
response = await self.parse_response(block=False, timeout=timeout)
File "D:\Python\Python36\lib\site-packages\aredis\pubsub.py", line 141, in parse_response
return await coro
File "D:\Python\Python36\lib\site-packages\aredis\pubsub.py", line 111, in _execute
return await command(*args)
File "D:\Python\Python36\lib\site-packages\aredis\connection.py", line 435, in read_response
response = await exec_with_timeout(self._parser.read_response(), self._stream_timeout, loop=self.loop)
File "D:\Python\Python36\lib\site-packages\aredis\connection.py", line 36, in exec_with_timeout
return await asyncio.wait_for(coroutine, timeout, loop=loop)
File "D:\Python\Python36\lib\asyncio\tasks.py", line 339, in wait_for
return (yield from fut)
File "D:\Python\Python36\lib\site-packages\aredis\connection.py", line 201, in read_response
response = await self._buffer.readline()
File "D:\Python\Python36\lib\site-packages\aredis\connection.py", line 102, in readline
await self._read_from_socket()
File "D:\Python\Python36\lib\site-packages\aredis\connection.py", line 62, in _read_from_socket
data = await self._stream.read(self.read_size)
File "D:\Python\Python36\lib\asyncio\streams.py", line 634, in read
yield from self._wait_for_data('read')
File "D:\Python\Python36\lib\asyncio\streams.py", line 452, in _wait_for_data
'already waiting for incoming data' % func_name)
master
branch of aredis?import asyncio
import aredis
rc = aredis.StrictRedis('localhost',
db=0,
decode_responses=True)
STREAM_ID = 'mystream'
async def handle_message(res):
_id, message = res
await rc.xdel(STREAM_ID, _id)
print(_id, message)
async def check():
await rc.xadd(STREAM_ID,
{'data': 'testdata'})
messages = await rc.xrange(STREAM_ID, '-', '+')
await asyncio.gather(*[handle_message(res) for res in messages])
stream_exists = await rc.exists(STREAM_ID)
if stream_exists:
streaminfo = await rc.xinfo_stream(STREAM_ID)
print(streaminfo)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(check(), return_exceptions=False))
info returned
Traceback:
aredis/commands/streams.py", line 32, in parse_xinfo_stream
res['first-entry'][1] = pairs_to_dict(res['first-entry'][1])
TypeError: 'NoneType' object is not subscriptable
from aredis import StrictRedis
import asyncio
async def sub_redis():
redis = StrictRedis('172.16.179.100', 6379)
redis_sub = redis.pubsub()
await redis_sub.subscribe('notify')
while True:
message = await redis_sub.get_message(False, 5)
print(message)
if __name__ == '__main__':
asyncio.ensure_future(sub_redis())
asyncio.get_event_loop().run_forever()
with redis-py, it only call subscribe once, and receive subscribe notify once
{'type': 'subscribe', 'pattern': None, 'channel': b'notify', 'data': 1}
None
None
None
{'type': 'subscribe', 'pattern': None, 'channel': b'notify', 'data': 1}
None
{'type': 'subscribe', 'pattern': None, 'channel': b'notify', 'data': 1}
None
{'type': 'subscribe', 'pattern': None, 'channel': b'notify', 'data': 1}
with aredis, it will receive subscribe notify every time when get_message() is called
I've come across similar problems occasionallyL
ERROR:asyncio:Task exception was never retrieved
--
| future: <Task finished coro=<Subscriber.run() done, defined at /opt/app-root/src/lib/redis/async_subscriber.py:41> exception=TypeError("object of type 'NoneType' has no len()")>
| Traceback (most recent call last):
| File "/opt/app-root/src/lib/redis/async_subscriber.py", line 64, in run
| last_id = await self.process_stream(last_id=last_id)
| File "/opt/app-root/src/lib/redis/async_subscriber.py", line 160, in process_stream
| **{self.stream_key: last_id},
| File "/usr/local/lib/python3.7/site-packages/aredis/commands/streams.py", line 238, in xreadgroup
| return await self.execute_command('XREADGROUP', *pieces)
| File "/usr/local/lib/python3.7/site-packages/aredis/client.py", line 156, in execute_command
| return await self.parse_response(connection, command_name, **options)
| File "/usr/local/lib/python3.7/site-packages/aredis/client.py", line 174, in parse_response
| return callback(response, **options)
| File "/usr/local/lib/python3.7/site-packages/aredis/commands/streams.py", line 22, in multi_stream_list
| result[r[0]] = stream_list(r[1])
| File "/usr/local/lib/python3.7/site-packages/aredis/commands/streams.py", line 12, in stream_list
| while len(kv_pairs) > 1:
| TypeError: object of type 'NoneType' has no len()
It seems to happen when there's a "(nil)" payload for a given event:
127.0.0.1:6379> XREADGROUP GROUP group_name consumer_name STREAMS stream_name 0
1) 1) "stream_name"
2) 1) 1) "1561724821836-0"
2) (nil)
2) 1) "1562178600447-0"
2) 1) "stream"
2) "stream_name"
3) "topic"
...
This seems to happen when an XDEL occurs on a message, but that same message was never XACK. It'd be nice to return an empty dict() in those cases, so the client can clear it from the list but still process the other events pending.
Originally posted by @szelenka in #113 (comment)
master
branch of aredis? YesUse aredis and execute a pipeline with Python 3.5.1.
No error.
Traceback (most recent call last):
File "c:\Users\2\AppData\Local\Programs\Python\Python35-32\Lib\asyncio\tasks.py", line 239, in _step
result = coro.send(None)
File "C:\prj\paintman\paintman\socket\async_socket_server.py", line 63, in process_queue
commands = await async_read_server_command_chunk()
File "C:\prj\paintman\paintman\socket\redis_protocol.py", line 89, in async_read_server_command_chunk
res = await p.execute()
File "C:\prj\paintman\env\lib\site-packages\aredis\pipeline.py", line 286, in execute
return await exec(conn, stack, raise_on_error)
File "C:\prj\paintman\env\lib\site-packages\aredis\pipeline.py", line 203, in _execute_transaction
if isinstance(response, typing.Awaitable):
AttributeError: module 'typing' has no attribute 'Awaitable'
aredis will check being awaitable in this line of code:
Line 203 in f662f33
and from what I understand, typing.Awaitable
is not available in all Python 3.5 versions (source: https://docs.python.org/3.5/whatsnew/changelog.html#id22). You could use inspect
package to check an object being awaitable or not:
import inspect
if inspect.isawaitable(response):
...
aredis/aredis/commands/sorted_set.py
Line 89 in bc75f74
If the server side set a timeout value, and the server close a connection because of idle, then ,this broken connection will stay in the pool forever.
and because pop and append is used to implement the pool, the released broken connection will be poped for next request and so on.
Sentinel does not work, whilst using redis-cli
or blocking redis-py
do. Maybe I'm doing something wrong, I noticed that the Sentinel
object differs from redis-py
one as it does not allow the keyword argument socket_timeout
(even being documented as a valid parameter).
import asyncio
from aredis import StrictRedis
REDIS_SERVICE_NAME = 'rsmaster'
REDIS_SENTINELS = [
('commons-sentinel1', 26379),
('commons-sentinel2', 26379),
('commons-sentinel3', 26379)
]
# First just do master connection
s = StrictRedis('commons-redis-master', 6379)
l = asyncio.get_event_loop()
l.run_until_complete(s.keys('*'))
>>> [b'test2', b'test1', b'test3', b'test']
# Now do it through Sentinel object
from aredis.sentinel import Sentinel
s = Sentinel(REDIS_SENTINELS)
l.run_until_complete(s.discover_master('rsmaster'))
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-12-93da80304d53> in <module>()
----> 1 l.run_until_complete(s.discover_master('rsmaster')
2 )
/usr/lib/python3.5/asyncio/base_events.py in run_until_complete(self, future)
385 raise RuntimeError('Event loop stopped before Future completed.')
386
--> 387 return future.result()
388
389 def stop(self):
/usr/lib/python3.5/asyncio/futures.py in result(self)
272 self._tb_logger = None
273 if self._exception is not None:
--> 274 raise self._exception
275 return self._result
276
/usr/lib/python3.5/asyncio/tasks.py in _step(***failed resolving arguments***)
237 # We use the `send` method directly, because coroutines
238 # don't have `__iter__` and `__next__` methods.
--> 239 result = coro.send(None)
240 else:
241 result = coro.throw(exc)
/usr/lib/python3.5/site-packages/aredis/sentinel.py in discover_master(self, service_name)
215 for sentinel_no, sentinel in enumerate(self.sentinels):
216 try:
--> 217 masters = await sentinel.sentinel_masters()
218 except (ConnectionError, TimeoutError):
219 continue
/usr/lib/python3.5/site-packages/aredis/commands/sentinel.py in sentinel_masters(self)
107 async def sentinel_masters(self):
108 "Returns a list of dictionaries containing each master's state."
--> 109 return await self.execute_command('SENTINEL MASTERS')
110
111 async def sentinel_monitor(self, name, ip, port, quorum):
/usr/lib/python3.5/site-packages/aredis/client.py in execute_command(self, *args, **options)
162 try:
163 await connection.send_command(*args)
--> 164 return await self.parse_response(connection, command_name, **options)
165 except (ConnectionError, TimeoutError) as e:
166 connection.disconnect()
/usr/lib/python3.5/site-packages/aredis/client.py in parse_response(self, connection, command_name, **options)
177 if command_name in self.response_callbacks:
178 callback = self.response_callbacks[command_name]
--> 179 return callback(response, **options)
180 return response
181
/usr/lib/python3.5/site-packages/aredis/commands/sentinel.py in parse_sentinel_masters(response)
66 result = {}
67 for item in response:
---> 68 state = parse_sentinel_state(map(str, item))
69 result[state['name']] = state
70 return result
/usr/lib/python3.5/site-packages/aredis/commands/sentinel.py in parse_sentinel_state(item)
49 def parse_sentinel_state(item):
50 result = pairs_to_dict_typed(item, SENTINEL_STATE_TYPES)
---> 51 flags = set(result['flags'].split(','))
52 for name, flag in (('is_master', 'master'), ('is_slave', 'slave'),
53 ('is_sdown', 's_down'), ('is_odown', 'o_down'),
KeyError: 'flags'
from redis.sentinel import Sentinel
REDIS_SERVICE_NAME = 'rsmaster'
REDIS_SENTINELS = [
('commons-sentinel1', 26379),
('commons-sentinel2', 26379),
('commons-sentinel3', 26379)
]
s = Sentinel(REDIS_SENTINELS, socket_timeout=0.1)
s.discover_master(REDIS_SERVICE_NAME)
>>> ('172.18.0.2', 6379)
m = s.master_for(REDIS_SERVICE_NAME)
m.keys('*')
>>> [b'test2', b'test1', b'test3', b'test']
bash-4.3# redis-cli -h commons-sentinel1 -p 26379
commons-sentinel1:26379> SENTINEL get-master-addr-by-name rsmaster
1) "172.18.0.2"
2) "6379"
commons-sentinel1:26379> SENTINEL slaves rsmaster
1) 1) "name"
2) "172.18.0.4:6379"
3) "ip"
4) "172.18.0.4"
5) "port"
6) "6379"
7) "runid"
8) "3b57a02fdf6be66085b82226a041a7040cc46ca8"
9) "flags"
10) "slave"
11) "link-pending-commands"
12) "0"
13) "link-refcount"
14) "1"
15) "last-ping-sent"
16) "0"
17) "last-ok-ping-reply"
18) "699"
19) "last-ping-reply"
20) "699"
21) "down-after-milliseconds"
22) "5000"
23) "info-refresh"
24) "6663"
25) "role-reported"
26) "slave"
27) "role-reported-time"
28) "1211479"
29) "master-link-down-time"
30) "0"
31) "master-link-status"
32) "ok"
33) "master-host"
34) "172.18.0.2"
35) "master-port"
36) "6379"
37) "slave-priority"
38) "100"
39) "slave-repl-offset"
40) "237851"
2) 1) "name"
2) "172.18.0.5:6379"
3) "ip"
4) "172.18.0.5"
5) "port"
6) "6379"
7) "runid"
8) "462a4a072ba7e4ded60f65a9b2b520d03c3e5d45"
9) "flags"
10) "slave"
11) "link-pending-commands"
12) "0"
13) "link-refcount"
14) "1"
15) "last-ping-sent"
16) "0"
17) "last-ok-ping-reply"
18) "699"
19) "last-ping-reply"
20) "698"
21) "down-after-milliseconds"
22) "5000"
23) "info-refresh"
24) "6663"
25) "role-reported"
26) "slave"
27) "role-reported-time"
28) "1211469"
29) "master-link-down-time"
30) "0"
31) "master-link-status"
32) "ok"
33) "master-host"
34) "172.18.0.2"
35) "master-port"
36) "6379"
37) "slave-priority"
38) "100"
39) "slave-repl-offset"
40) "237851"
commons-sentinel1:26379> SENTINEL sentinels rsmaster
1) 1) "name"
2) "bcf7e6582e10f9cc70360348c1a529a6a6ad3a10"
3) "ip"
4) "172.18.0.8"
5) "port"
6) "26379"
7) "runid"
8) "bcf7e6582e10f9cc70360348c1a529a6a6ad3a10"
9) "flags"
10) "sentinel"
11) "link-pending-commands"
12) "0"
13) "link-refcount"
14) "1"
15) "last-ping-sent"
16) "0"
17) "last-ok-ping-reply"
18) "133"
19) "last-ping-reply"
20) "133"
21) "down-after-milliseconds"
22) "5000"
23) "last-hello-message"
24) "1378"
25) "voted-leader"
26) "?"
27) "voted-leader-epoch"
28) "0"
2) 1) "name"
2) "985e7c3435b9c1593fdb241d668b123f55481ac8"
3) "ip"
4) "172.18.0.7"
5) "port"
6) "26379"
7) "runid"
8) "985e7c3435b9c1593fdb241d668b123f55481ac8"
9) "flags"
10) "sentinel"
11) "link-pending-commands"
12) "0"
13) "link-refcount"
14) "1"
15) "last-ping-sent"
16) "0"
17) "last-ok-ping-reply"
18) "133"
19) "last-ping-reply"
20) "133"
21) "down-after-milliseconds"
22) "5000"
23) "last-hello-message"
24) "800"
25) "voted-leader"
26) "?"
27) "voted-leader-epoch"
28) "0"
commons-sentinel1:26379> SENTINEL sentinels rsmaster
1) 1) "name"
2) "bcf7e6582e10f9cc70360348c1a529a6a6ad3a10"
3) "ip"
4) "172.18.0.8"
5) "port"
6) "26379"
7) "runid"
8) "bcf7e6582e10f9cc70360348c1a529a6a6ad3a10"
9) "flags"
10) "sentinel"
11) "link-pending-commands"
12) "0"
13) "link-refcount"
14) "1"
15) "last-ping-sent"
16) "0"
17) "last-ok-ping-reply"
18) "972"
19) "last-ping-reply"
20) "972"
21) "down-after-milliseconds"
22) "5000"
23) "last-hello-message"
24) "1371"
25) "voted-leader"
26) "?"
27) "voted-leader-epoch"
28) "0"
2) 1) "name"
2) "985e7c3435b9c1593fdb241d668b123f55481ac8"
3) "ip"
4) "172.18.0.7"
5) "port"
6) "26379"
7) "runid"
8) "985e7c3435b9c1593fdb241d668b123f55481ac8"
9) "flags"
10) "sentinel"
11) "link-pending-commands"
12) "0"
13) "link-refcount"
14) "1"
15) "last-ping-sent"
16) "0"
17) "last-ok-ping-reply"
18) "972"
19) "last-ping-reply"
20) "972"
21) "down-after-milliseconds"
22) "5000"
23) "last-hello-message"
24) "573"
25) "voted-leader"
26) "?"
27) "voted-leader-epoch"
28) "0"
To enable user to write code easily~
Want to be able to use ZADD options.
Hello Jason,
Looks like PubSub get_message() function is not working as expected, on just created channel without no data. I assume that call of this method should return None in that case, however None returns only once and then execution "hangs" and awaits for coroutine to be completed (actual data appears in the channel) and then execution continues.
Here are the steps to reproduce this issue:
# 1. Create some subscription (PubSub obj)
# redis — initialized connection to some Redis instance
pbsb = redis.pubsub(ignore_subscribe_messages=True)
await pbsb.subscribe("some-channel")
# 2. Query message in the loop
timeout = 5
message = None
end_time = time.time() + timeout
while timeout == 0 or time.time() < end_time:
print(1)
message = await pbsb.get_message(timeout=1)
print(2)
# 3. results
>>> 1
>>> 2
>>> 1
# these should be a little more 1 and 2 :)
# 4. Nothing more happened until some data arrives, then
>>> 2
I've attached a patch
addressed to resolve this issue, it removes connection.can_read()
check in aredis/pubsub.py on line 132, because in that case it always returns False
and adds several imports:
from aredis.exceptions import ConnectionError
from aredis.exceptions import TimeoutError
Looks like without them except
on line 112 is not working as expected.
Also in aredis/connection.py on line 307 I've added check
if not self._reader:
break
Because when coroutine actually cancelled, self._reader
is getting None and this is leads to unwanted exception.
Best,
Jack
Hi!
It would be very useful to have distributed lock support for the cluster in your awesome library. It's very common to have Redis cluster on production and Redis-related logic is just broken with >1 instance of consumer, for example. None of current drivers have such a feature.
Thank you.
master
branch of aredis?import asyncio
from aredis import StrictRedisCluster, StrictRedis
from aredis import ClusterConnectionPool
import settings
async def test_connection():
pool = ClusterConnectionPool.from_url('rediss://xxx.cache.amazonaws.com:6379', decode_components=False)
redis = StrictRedisCluster(connection_pool=pool, decode_responses=False, skip_full_coverage_check=True)
await redis.get('a')
loop = asyncio.get_event_loop()
loop.run_until_complete(test_connection())
Connection goes through
Traceback (most recent call last):
File "test.py", line 12, in <module>
loop.run_until_complete(try_hard())
File "/usr/local/lib/python3.6/asyncio/base_events.py", line 473, in run_until_complete
return future.result()
File "test.py", line 9, in try_hard
await redis.get('a')
File "/usr/local/lib/python3.6/site-packages/aredis/commands/strings.py", line 147, in get
return await self.execute_command('GET', name)
File "/usr/local/lib/python3.6/site-packages/aredis/utils.py", line 167, in inner
return await func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/aredis/client.py", line 356, in execute_command
await self.connection_pool.initialize()
File "/usr/local/lib/python3.6/site-packages/aredis/pool.py", line 299, in initialize
await self.nodes.initialize()
File "/usr/local/lib/python3.6/site-packages/aredis/nodemanager.py", line 189, in initialize
raise RedisClusterException('Redis Cluster cannot be connected. '
aredis.exceptions.RedisClusterException: Redis Cluster cannot be connected. Please provide at least one reachable node.
I am using StrictRedisCluster.
I am getting a bunch of MovedErrors. Is there some setting I need to set to allow for redirections upon cluster movements?
aredis is failing to install in a new environment. Trying to install it results in the following error:
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/pip-build-ztxqbwm1/aredis/setup.py", line 5, in <module>
from aredis import __version__
File "/tmp/pip-build-ztxqbwm1/aredis/aredis/__init__.py", line 1, in <module>
from aredis.client import StrictRedis
File "/tmp/pip-build-ztxqbwm1/aredis/aredis/client.py", line 10, in <module>
from aredis.commands.cluster import ClusterCommandMixin
ImportError: No module named 'aredis.commands'
----------------------------------------
The reason seems to be that setup.py is loading the version from init, this results in trying to load the whole package, which fails as the package is not installed yet.
Pip version 9.0.1, aredis version 1.0.4
master
branch of aredis?to get a result is not None with callback
# the demo code is something like this
while not result:
await tornado.gen.sleep(0.2)
result = await self.application.redis_client.get(prefix + task.id)
but it do decrese the performance of my web app? so is that any possible way to get the result with callback ?
master
branch of aredis?#!/usr/bin/python
import asyncio
import logging
import os
from aredis import StrictRedisCluster
from aredis import ClusterConnectionPool
async def example():
pool = ClusterConnectionPool.from_url(os.getenv('REDIS_CLUSTER_URL'))
redis = StrictRedisCluster(connection_pool=pool, decode_responses=False, skip_full_coverage_check=True)
logging.info(redis.connection_pool.initialized)
logging.info(redis.connection_pool.nodes.slots.keys())
await redis.connection_pool.initialize()
logging.info(redis.connection_pool.initialized)
logging.info(redis.connection_pool.nodes.slots[15495])
logging.info(await redis.get('a'))
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
loop = asyncio.get_event_loop()
loop.run_until_complete(example())
export REDIS_CLUSTER_URL=rediss://:###@###.cache.amazonaws.com:7000/0
Connects and does not raise exception
INFO:root:False
INFO:root:dict_keys([])
Traceback (most recent call last):
File "/usr/lib/python3.6/site-packages/aredis/nodemanager.py", line 123, in initialize
r = self.get_redis_link(host=node['host'], port=node['port'])
File "/usr/lib/python3.6/site-packages/aredis/nodemanager.py", line 98, in get_redis_link
return StrictRedis(host=host, port=port, decode_responses=True, **connection_kwargs)
TypeError: __init__() got an unexpected keyword argument 'ssl_context'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "debug_redis.py", line 24, in <module>
loop.run_until_complete(example())
File "/usr/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
return future.result()
File "debug_redis.py", line 15, in example
await redis.connection_pool.initialize()
File "/usr/lib/python3.6/site-packages/aredis/pool.py", line 299, in initialize
await self.nodes.initialize()
File "/usr/lib/python3.6/site-packages/aredis/nodemanager.py", line 129, in initialize
raise RedisClusterException('ERROR sending "cluster slots" command to redis server: {0}'.format(node))
aredis.exceptions.RedisClusterException: ERROR sending "cluster slots" command to redis server: {'host': '###.cache.amazonaws.com', 'port': '###'}
master
branch of aredis?Too Many connections
This question has really been bothering me
And I tried to set the connections-config of redis-cluster-server
And set ClusterConnectionPool max_connections.
With high concurrency, the number of sockets continues to increase, and sockets will not be released.
Can I configure connections to reuse as much as possible?
And Automatic recovery and release when idle
command
I am not sure whether i should add support for the new command CLIENT REPLY
(which need to share one connection between different commands execution, may lead change on execute_command
) introduced since 3.2 release.
How do you think about that?
master
branch of aredis?run tornado with routerconfig
to get the value from redis by key with await
When will the streams commands be added?
Looking forward for that~
master
branch of aredis?In this sample, scan_iter with StrictRedisCluster would raise exception
import asyncio
from aredis import StrictRedisCluster
async def example():
client = StrictRedisCluster(startup_nodes=settings.REDIS_CLUSTER, skip_full_coverage_check=True)
keys = client.scan_iter()
async for key in keys:
print(key)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(example())
Redis keys are iterated and returned
Traceback (most recent call last):
File "sample.py", line 24, in <module>
loop.run_until_complete(example())
File "/usr/local/lib/python3.6/asyncio/base_events.py", line 468, in run_until_complete
return future.result()
File "sample.py", line 16, in example
async for key in keys:
File "/usr/local/lib/python3.6/site-packages/aredis/commands/iter.py", line 23, in scan_iter
cursor, data = await self.scan(cursor=cursor, match=match, count=count)
ValueError: not enough values to unpack (expected 2, got 1)
5 years ago redis-py added support for keepalive options on the socket connection to redis [1].
I like to use those in aredis as well. If an interest exist to add those to aredis I am willing to write a PR for it.
Hello! i'm having a trouble implementing pub/sub with server sent events.
and i got
/home/vagrant/.pyenv/versions/3.6.1/lib/python3.6/asyncio/coroutines.py:109: RuntimeWarning: coroutine 'PubSub.on_connect' was never awaited
return self.gen.send(None)
my stream view
@app.route("/notifications")
async def notification(request):
async def _stream(res):
redis = aredis.StrictRedis()
pub = redis.pubsub()
await pub.subscribe('test')
while True:
message = await pub.get_message()
if message:
res.write(message)
asyncio.sleep(0.001)
return stream(_stream)
Any thoughts?
I have noticed that await client.flushdb()
is called after creating the client object, do I have to call this function each time before using?
>>> async def example():
>>> client = StrictRedis(host='127.0.0.1', port=6379, db=0)
>>> await client.flushdb()
>>> await client.set('foo', 1)
>>> assert await client.exists('foo') is True
>>> await client.incr('foo', 100)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.