Code Monkey home page Code Monkey logo

asyncio-redis's Introduction

Redis client for Python asyncio.

Build Status

Redis client for the PEP 3156 Python event loop.

This Redis library is a completely asynchronous, non-blocking client for a Redis server. It depends on asyncio (PEP 3156) and requires Python 3.6 or greater. If you're new to asyncio, it can be helpful to check out the asyncio documentation first.

Maintainers needed!

Right now, this library is working fine, but not actively maintained, due to lack of time and shift of priorities on my side (Jonathan). Most of my time doing open source goes to prompt_toolkt community.

I still merge pull request when they are fine, especially for bug/security fixes. But for a while now, we don't have new features. If you are already using it, then there's not really a need to worry, asyncio-redis will keep working fine, and we fix bugs, but it's not really evolving.

If anyone is interested to seriously take over development, please let me know. Also keep in mind that there is a competing library called aioredis, which does have a lot of activity.

See issue #134 to discuss.

Features

  • Works for the asyncio (PEP3156) event loop
  • No dependencies except asyncio
  • Connection pooling
  • Automatic conversion from unicode (Python) to bytes (inside Redis.)
  • Bytes and str protocols.
  • Completely tested
  • Blocking calls and transactions supported
  • Streaming of some multi bulk replies
  • Pubsub support

Trollius support: There is a fork by Ben Jolitz that has the necessary changes for using this asyncio-redis library with Trollius.

Installation

pip install asyncio_redis

Documentation

View documentation at read-the-docs

The connection class

A asyncio_redis.Connection instance will take care of the connection and will automatically reconnect, using a new transport when the connection drops. This connection class also acts as a proxy to a asyncio_redis.RedisProtocol instance; any Redis command of the protocol can be called directly at the connection.

import asyncio
import asyncio_redis


async def example():
    # Create Redis connection
    connection = await asyncio_redis.Connection.create(host='localhost', port=6379)

    # Set a key
    await connection.set('my_key', 'my_value')

    # When finished, close the connection.
    connection.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(example())

Connection pooling

Requests will automatically be distributed among all connections in a pool. If a connection is blocking because of --for instance-- a blocking rpop, another connection will be used for new commands.

import asyncio
import asyncio_redis


async def example():
    # Create Redis connection
    connection = await asyncio_redis.Pool.create(host='localhost', port=6379, poolsize=10)

    # Set a key
    await connection.set('my_key', 'my_value')

    # When finished, close the connection pool.
    connection.close()

Transactions example

import asyncio
import asyncio_redis


async def example(loop):
    # Create Redis connection
    connection = await asyncio_redis.Pool.create(host='localhost', port=6379, poolsize=10)

    # Create transaction
    transaction = await connection.multi()

    # Run commands in transaction (they return future objects)
    f1 = await transaction.set('key', 'value')
    f2 = await transaction.set('another_key', 'another_value')

    # Commit transaction
    await transaction.exec()

    # Retrieve results
    result1 = await f1
    result2 = await f2

    # When finished, close the connection pool.
    connection.close()

It's recommended to use a large enough poolsize. A connection will be occupied as long as there's a transaction running in there.

Pubsub example

import asyncio
import asyncio_redis

async def example():
    # Create connection
    connection = await asyncio_redis.Connection.create(host='localhost', port=6379)

    # Create subscriber.
    subscriber = await connection.start_subscribe()

    # Subscribe to channel.
    await subscriber.subscribe([ 'our-channel' ])

    # Inside a while loop, wait for incoming events.
    while True:
        reply = await subscriber.next_published()
        print('Received: ', repr(reply.value), 'on channel', reply.channel)

    # When finished, close the connection.
    connection.close()

LUA Scripting example

import asyncio
import asyncio_redis

code = \
"""
local value = redis.call('GET', KEYS[1])
value = tonumber(value)
return value * ARGV[1]
"""


async def example():
    connection = await asyncio_redis.Connection.create(host='localhost', port=6379)

    # Set a key
    await connection.set('my_key', '2')

    # Register script
    multiply = await connection.register_script(code)

    # Run script
    script_reply = await multiply.run(keys=['my_key'], args=['5'])
    result = await script_reply.return_value()
    print(result) # prints 2 * 5

    # When finished, close the connection.
    connection.close()

Example using the Protocol class

import asyncio
import asyncio_redis


async def example():
    loop = asyncio.get_event_loop()

    # Create Redis connection
    transport, protocol = await loop.create_connection(
                asyncio_redis.RedisProtocol, 'localhost', 6379)

    # Set a key
    await protocol.set('my_key', 'my_value')

    # Get a key
    result = await protocol.get('my_key')
    print(result)

    # Close transport when finished.
    transport.close()

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(example())

asyncio-redis's People

Contributors

aaugustin avatar andrepl avatar asvetlov avatar crazyzubr avatar crusaderky avatar dblackdblack avatar dcrewi avatar djarb avatar dmitrypolo avatar graingert avatar husio avatar jkpubsrc avatar jonathanslenders avatar kidoz avatar martiusweb avatar minus7 avatar mrdon avatar popravich avatar saghul avatar shantanoo avatar stkrp avatar tumb1er avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

asyncio-redis's Issues

SPOP on non-existing key should be None or an explicit exception

#! /usr/bin/env python3                                                                                                                                                                                   
import asyncio, asyncio_redis as aioredis

@asyncio.coroutine
def go():
    conn = yield from aioredis.Pool.create(host='127.0.0.1', port=6379)
    v = yield from conn.spop('some-non-existent-key')
    print(v)
    conn.close()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(go())
    loop.close()

The above code gives an AssertionError, which is somewhat unexpected:

Traceback (most recent call last):
  File "spop_test.py", line 14, in <module>
    loop.run_until_complete(go())
  File "/Users/joongi/homebrew/Cellar/python3/3.4.3_2/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/base_events.py", line 316, in run_until_complete
    return future.result()
  File "/Users/joongi/homebrew/Cellar/python3/3.4.3_2/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/futures.py", line 275, in result
    raise self._exception
  File "/Users/joongi/homebrew/Cellar/python3/3.4.3_2/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py", line 238, in _step
    result = next(coro)
  File "spop_test.py", line 8, in go
    v = yield from conn.spop('some-non-existent-key')
  File "/Users/joongi/homebrew/lib/python3.4/site-packages/asyncio_redis/protocol.py", line 657, in wrapper
    result = yield from post_process(protocol_self, result)
  File "/Users/joongi/homebrew/Cellar/python3/3.4.3_2/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/coroutines.py", line 141, in coro
    res = func(*args, **kw)
  File "/Users/joongi/homebrew/lib/python3.4/site-packages/asyncio_redis/protocol.py", line 378, in bytes_to_native
    assert isinstance(result, bytes)
AssertionError
Exception ignored in: Task was destroyed but it is pending!
task: <Task pending coro=<_reader_coroutine() running at /Users/joongi/homebrew/lib/python3.4/site-packages/asyncio_redis/protocol.py:919> wait_for=<Future pending cb=[Task._wakeup()]>>

I think the result should be None, or an explicit exception such as "key does not exist".

what about timeouts?

Hi!
As for now, yield from redis.get(...) blocks forever, if we shut down redis server after success connect. What about respecting socket read timeout and removing timeouted connections from pool after a TimeoutError?

Sergey

ErrorReply: ERR MULTI calls can not be nested

Here's another weird one we've been encountering (though less frequently than #98):

ErrorReply: ERR MULTI calls can not be nested
  File "courier/handlers/socket.py", line 186, in _extend_open_connections_ttl
    await self._increment_open_connections()
  File "courier/handlers/socket.py", line 156, in _increment_open_connections
    transaction = await self.redis.multi()
  File "asyncio_redis/protocol.py", line 656, in wrapper
    result = yield from method(protocol_self, *a, **kw)
  File "asyncio_redis/protocol.py", line 2224, in multi
    result = yield from self._query(b'multi')
  File "asyncio_redis/protocol.py", line 1096, in _query
    result = yield from self._get_answer(answer_f, _bypass=_bypass, call=call)
  File "asyncio_redis/protocol.py", line 1040, in _get_answer
    result = yield from answer_f
  File "asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
  File "asyncio/tasks.py", line 296, in _wakeup
    future.result()
  File "asyncio/futures.py", line 274, in result
    raise self._exception

We're definitely not nesting calls to multi in our app code โ€“ here are the relevant functions:

async def _extend_open_connections_ttl(self):
    """Resets the expiration for the connected identity's connection count
    into the future.
    """
    try:
        result = await self.redis.expire(
            self._open_connections_key,
            self._open_connections_ttl,
        )
        if result == 0:
            # Key does not exist (or couldn't be set)
            await self._increment_open_connections()
            self.identity.is_active = True
            await self.identity.save()
    except:
        self.get_sentry_client().captureException()
        raise

async def _increment_open_connections(self):
    """Increments the count of active connections for the connected
    identity and resets the expiration of said count into the future.
    """
    transaction = await self.redis.multi()
    f_incr = await transaction.incr(self._open_connections_key)
    f_expire = await transaction.expire(
        self._open_connections_key,
        self._open_connections_ttl,
    )
    await transaction.exec()

    incr_result, expire_result = await asyncio.gather(f_incr, f_expire)
    return (incr_result, expire_result)

Our stack is:

  • Python 3.5.2
  • tornado 4.4.1 (using the asyncio event loop)
  • asyncio_redis 0.14.2
  • Redis 3.2.3

Use asyncio.streams for parsing the redis stream.

Don't use the lower level Protocol api. Design a RedisClient class that takes a socket or streams and creates a StreamReader/StreamWriter pair for parsing the input stream. This will be much more clean.

File descriptor leak when can't connect to host

There is a small problem with BaseEventLoop.create_connection() method: it doesn't have connection timeout.
For example, this code never finishes because of non-blocking client socket:

import asyncio
from asyncio_redis import Connection
@asyncio.coroutine
def test():
  c = yield from Connection.create('22.0.0.22')
  print("Connection created")

asyncio.get_event_loop().run_until_complete(test())

Next step to fix this problem was using asyncio.wait_for to add connection timeout.
Program now continues with asyncio.futures.TimeoutError but it leaves open file descriptor for socket, created in BaseEventLoop.create_connection() method.

asyncio.get_event_loop().run_until_complete(
  asyncio.wait_for(test(), timeout=1))

But (thanks for asyncio authors) we have an optional argument sock in this create_connection. I solved my connection problem with creating socket myself and passing it to create_connection instead of host and port.

def connect(host):
  sock = ...  # here is a lot of code from create_connection to resolve host, open non-blocking socket and handle exceptions.
  try:
    yield from asyncio.wait_for(loop.sock_connect(sock, address), 
      timeout=1)
    yield from asyncio.wait_for(loop.create_connection(
      protocol_factory, sock=sock), timeout=1)
  except:
    try:
      loop.remove_writer(sock.fileno())
      sock.close()
    except:
      pass

pipelining requests

Hi, I wanted to see pipelining in action and tried out the /examples/benchmarks/speed_test.py but I didn't see any significant improvement. When I peeked at it with wireshark I saw all the requests sent in the separate TCP transactions.

Any ideas?

Thanks, Jonathan

TypeError: Got unexpected return type 'Future' in RedisProtocol.decr, expected <class 'int'>

This has been happening with increasing frequency in production for us:

TypeError: Got unexpected return type 'Future' in RedisProtocol.decr, expected <class 'int'>
  File "courier/handlers/socket.py", line 271, in handle_close
    connection_count = await self._decrement_open_connections()
  File "courier/handlers/socket.py", line 169, in _decrement_open_connections
    decr_result = await self.redis.decr(self._open_connections_key)
  File "asyncio_redis/protocol.py", line 659, in wrapper
    typecheck_return(protocol_self, result)
  File "asyncio_redis/protocol.py", line 517, in typecheck_return
    (type(result).__name__, self.method.__name__, expected_type))

It doesn't happen every time, but it definitely happens enough to be a problem. It seems to be a bug in this library. @jonathanslenders any idea why the protocol is sometimes returning a Future and sometimes an int?

Our stack is:

  • Python 3.5.2
  • tornado 4.4.1 (using the asyncio event loop)
  • asyncio_redis 0.14.2
  • Redis 3.2.3

AssertionError in multibulk_as_scanpart

Whilst scanning a set I find that I semi-consistently get an AssertionError whilst fetchone-ing from a cursor:

Traceback (most recent call last):
  File "/usr/lib/python3.4/asyncio/tasks.py", line 236, in _step
    result = coro.send(value)
  File "/home/oliver/ws/serverstf/serverstf/cache.py", line 969, in __fetch_addresses_from_cursor
    item = yield from cursor.fetchone()
  File "/home/oliver/ws/asyncio-redis/asyncio_redis/cursors.py", line 57, in fetchone
    yield from self._fetch_more()
  File "/home/oliver/ws/asyncio-redis/asyncio_redis/cursors.py", line 37, in _fetch_more
    chunk = yield from self._scanfunc(self._cursor, self.count)
  File "/home/oliver/ws/asyncio-redis/asyncio_redis/protocol.py", line 658, in wrapper
    result = yield from post_process(protocol_self, result)
  File "/home/oliver/ws/asyncio-redis/asyncio_redis/protocol.py", line 341, in multibulk_as_scanpart
    assert isinstance(result, MultiBulkReply)
AssertionError

cache.py:969 if it is of use.

After a bit of rudimentary poking around it seems that multibulk_as_scanpart was actually receiving a future of a MultiBulkReply if I start a MULTI transaction whilst performing the SSCAN using the same connection.

I didn't really get any further than that in trying to understand what's going on and I haven't yet managed to create a simple test case to reproduce it.

To work around this I'm currently using a two separate connections: one to do the SSCAN and the other to do the MULTI.

pubsub incompatible with auto_reconnect

auto_reconnect can't be used with a connection with a pubsub subscription if the connection uses a password or specific db.

Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.4.3/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py", line 238, in _step
    result = next(coro)
  File "/Users/jhoarau/src/atlassian/misc/hc-standup2/venv/lib/python3.4/site-packages/asyncio_redis/protocol.py", line 813, in initialize
    yield from self.auth(self.password)
  File "/Users/jhoarau/src/atlassian/misc/hc-standup2/venv/lib/python3.4/site-packages/asyncio_redis/protocol.py", line 643, in wrapper
    raise Error('Cannot run command inside pubsub subscription.')

closing connection pool results in error message for each closed connection

When using the connection_pool.close() method, it prints an error message for each of the connections in the pool. i.e. the following error message x 10. Is it possible to close these silently?

Task was destroyed but it is pending! task: Task cancelling coro=<_reader_coroutine() running at /path/anaconda/lib/python3.4/site-packages/asyncio_redis/protocol.py:919> wait_for=Future finished result=None

Using `start_subscribe`

When using

subscriber = yield from connection_pool.start_subscribe()

From what I understand, the connection_pool is set to be in pub/sub mode and methods to get or set keys won't be allowed for that particular connection_pool since it can only do pub/sub stuff.

What I don't understand however is if the subscriber is attached to only one connection or the whole bunch of connections in the connection_pool.

Does anyone have a light?

Any way to ping Subscription connection

Redis connection has method 'PING' to check the connection status.
Redis pubsub can't run any connection methods, so ping doesn't answer 'pong', but it returns an error:

$> telnet localhost 6379
Trying ::1...
Connected to localhost.
Escape character is '^]'.
subscribe test 
*3
$9
subscribe
$4
test
:1
ping
-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context
quit
+OK
Connection closed by foreign host.

So, first way to check is just to send 'ping' and receive an ErrorReply exception from redis server.
Other way is to parse subscribe result and return it (now it is just skipped in RedisProtocol._pubsub_method)

No other ways are to determine network connection status from pubsub exist.

Of course you can open another connection and publish 'ping' message to a unique pubsub channel to check it, but network issues with new connection may occure that hide problems with original pub/sub.

To simulate similar network problems just send SIGSTOP to redis server. It will accept new connections but will not respond to any commands.

HiRedisProtocol's subscription service is broken

Using asyncio-redis==0.14.2

Code:

import asyncio
from asyncio_redis import Pool
from asyncio_redis.protocol import HiRedisProtocol, RedisProtocol

@asyncio.coroutine
def read_messages():
    klass = HiRedisProtocol
    # klass = RedisProtocol
    redis_connection = yield from Pool().create(host='redis-01.myserver.com', port=26380, protocol_class=klass)
    redis_subscriber = yield from redis_connection.start_subscribe()
    yield from redis_subscriber.subscribe([':myqueue.something.notification'])
    print("Listening to messages...")
    while True:
        message = yield from redis_subscriber.next_published()
        print("Message recieved: {}".format(message))
    redis_connection.close()

event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
event_loop.run_until_complete(read_messages())

fails with (on startup, and each time a message arrives):

Exception in callback _SelectorSocketTransport._read_ready()
handle: <Handle _SelectorSocketTransport._read_ready()>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/events.py", line 125, in _run
    self._callback(*self._args)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/selector_events.py", line 667, in _read_ready
    self._protocol.data_received(data)
  File "/Users/agautam/work/git/cash-mgmt-server/venv/lib/python3.5/site-packages/asyncio_redis/protocol.py", line 2446, in data_received
    self._process_hiredis_item(item, self._push_answer)
  File "/Users/agautam/work/git/cash-mgmt-server/venv/lib/python3.5/site-packages/asyncio_redis/protocol.py", line 2459, in _process_hiredis_item
    cb(reply)
  File "/Users/agautam/work/git/cash-mgmt-server/venv/lib/python3.5/site-packages/asyncio_redis/protocol.py", line 1060, in _push_answer
    f = self._queue.popleft()
IndexError: pop from an empty deque

Workaround: Go back to using RedisProtocol.

klass = RedisProtocol

Let me know if you need more info.

Thanks

loop is not being explicitly passed inside asyncio-redis

Setting asyncio default event loop to None causes AssertionError.
see http://legacy.python.org/dev/peps/pep-3156/#passing-an-event-loop-around-explicitly
The following example will reproduce this.

import asyncio
from asyncio_redis import Connection

loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)

redis = loop.run_until_complete(Connection.create(loop=loop))

In my case the reason why I need to pass loop explicitly is for tests -- in unittests I need to be sure that no side effects happen because of default event loop.

Use 'scan' function, lost some key

import asyncio
import asyncio_redis
from asyncio_redis.encoders import BytesEncoder
from policy import _redis_config

@asyncio.coroutine
def callback(cors,envenloop):
    result = yield from asyncio.wait_for(cors,timeout=None,loop=envenloop)
    yield from asyncio.sleep(1)

@asyncio.coroutine
def enumkey(redis_conn,m):
    redis_cursor = yield from redis_conn.scan(match=m)
    keys = yield from redis_cursor.fetchall()
    i = 0
    for key in keys:
        print(i,":",key)
        i+=1
    return i
@asyncio.coroutine
def task(eventloop):
    redis_conn = None
    try:
        redis_conn = yield from asyncio_redis.Connection.create(auto_reconnect=False,encoder=BytesEncoder(),loop=eventloop,**_redis_config)
        print('-----------------------------------')
        i = yield from enumkey(redis_conn,b'p*')
        print('p* total %d'%i)
        print('-----------------------------------')
        i = yield from enumkey(redis_conn,b'pu:*')
        print('pu:* total %d'%i)
        print('-----------------------------------')
        i = yield from enumkey(redis_conn,b'pn:*')
        print('pn:* total %d'%i)
        print('-----------------------------------')
        i = yield from enumkey(redis_conn,b'k*')
        print('k* total %d'%i)
        print('-----------------------------------')
        i = yield from enumkey(redis_conn,b'sync*')
        print('sync* total %d'%i)
        print('-----------------------------------')
        i = yield from enumkey(redis_conn,b'*')
        print('* total %d'%i)

    except Exception as e:
        print(e)
    finally:
        if redis_conn:
            redis_conn.close()
if __name__ == '__main__':
    eventloop = asyncio.get_event_loop()
    cors = task(eventloop)
    eventloop.run_until_complete(callback(cors,eventloop))

There are output:

p* total 70
pu:* total 0
pn:* total 64
k* total 1
k2* total 0
k3* total 0
sync* total 0

  • total 74

But, when I use redis lib,there are output:
p* total 70
pu:* total 3
pn:* total 64
k* total 3
k2* total 1
k3* total 1
sync* total 1

  • total 74

Support LUA scripts

redis-py has a particularly nice interface where you get a script object that is callable.

Ability to fetch specific connection from Pool

I tried to use SELECT command.
Then I realized that it is impossible with the current asyncio_redis' Pool structure.

Redis stores "dbid" state per connection, and asyncio_redis.Pool proxies every single call to protocol methods to a free connection without knowing about which dbid the returned connection is using and should use.

pool = asyncio_redis.Pool.create(...)
...

@asyncio.coroutine
def some_interleaved_function(...):
    yield from pool.select(my_dbid)
    # Here, some other function may run,
    # and it may unfortunately change the same connection's dbid
    # by calling select() with different number.
    yield from pool.get(...)  # This will use wrong dbid.

To resolve this issue, I suggest the following API that returns a context manager object associated with a specific connection to preserve the dbid state until the context exits:

pool = asyncio_redis.Pool.create(...)
...

@asyncio.coroutine
def some_interleaved_function(...):
    with pool.get_free_connection() as conn:
        yield from conn.select(my_dbid)
        yield from conn.get(...)
        ... # the selected dbid is persistent until we exit the context block.

Currently we have pool._get_free_connection(), but we need to address an extra concern:

  • Should get_free_connection() be a coroutine?
    (i.e., should we be able to wait until a connection becomes ready or possibly the pool adds another connection within some "max" limit?)

Add support for COUNT option in SCAN and its families

Redis' default is to fetch at most 10 items per each SCAN commands, but asyncio_redis currently does not provide any means to modify this number, which may be required for performance tuning.
Redis' documentation says that it is okay to change the count number during cursor iteration, so it would be good to add an optional argument count to fetchone() and fetchall() methods in the Cursor class and pass it to the _scanfunc() via _fetch_more().

CancelledError in _get_answer stops all the magic

I use aiohttp + asyncio_redis. When client aborts http request, aiohttp raises CancelledError, and mostly it affects RedisProtocol._get_answer. Next http request just blocks forever. In RedisProtocol instance in self._queue cancelled Futures are accumulated, in my case one Future<CANCELLED> per request.
Haven't written test for it yet, but is it always correct to pop one cancelled future from _queue on CancelledError?

def _get_answer(...):
    try:
         result = yield from answer_f
    except CancelledError:
         self._pop_cancelled_future()
         raise
    ...

Multibulk result read performance

Simple ZRANGE test for key which contains 10000 items.

def sync():
    r = redis.Redis()
    result = r.zrange("key", 0, -1, withscores=True)

print(timeit.timeit(sync, number=10))
#1.33 sec
@asyncio.coroutine
def get_async():
    r = yield from asyncio_redis.Connection.create()
    f = yield from r.zrange("key", 0, -1)
    result = yield from f.asdict()

def async():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(asyncio.async(get_async()))
    loop.close()
    asyncio.set_event_loop(None)

print(timeit.timeit(async, number=10))
#25 seconds! 

It's because to receive one item-score pair we perform yield from future.
Multibulk parser need to have less overhead since redis itself is very fast.

z* commands overly pedantic about types

I just spent 20 minutes wrestling with zadd and zremrangebyscore because they absolutely MUST have ZScoreBoundary types.

Perhaps have them construct a ZScoreBoundary from the value.
Then ZScoreBoundary.new could return the value itself if it's already one.
Meanwhile, ZScoreBoundary.init could cast int/decimal to float.

Retrieve a value between WATCH and MULTI

Hi,

As far as I understand, WATCH allows to monitor keys for concurrent update so a future transaction (started by MULTI) will fail if the watched keys have been modified by a concurrent connection since watched.

A common use case would then be:

WATCH key
value = GET key
# update value
MULTI
SET key value
EXEC

However, since the only way to execute the WATCH command is via multi(watch=[key]), it's not possible to retrieve a value between the call to WATCH and the call to MULTI.

Did I miss something or would it make sense to add a RedisProtocol.watch() method for this case?
I'd gladly work on the issue and submit a pull request if you think it makes sense.

Cheers

pubsub: IndexError: pop from an empty deque

When subscribing to multiple channels:

Future/Task exception was never retrieved:
Traceback (most recent call last):
  File "/home/piotrek/projects/newsreader/env/lib/python3.3/site-packages/asyncio/tasks.py", line 281, in _step
    result = coro.send(value)
  File "/home/piotrek/projects/newsreader/asyncio-redis/asyncio_redis/protocol.py", line 932, in _handle_pubsub_multibulk_reply
    self._push_answer(result)
  File "/home/piotrek/projects/newsreader/asyncio-redis/asyncio_redis/protocol.py", line 988, in _push_answer
    f = self._queue.popleft()
IndexError: pop from an empty deque

I wrote a test for this in my brach pubsub: https://github.com/husio/asyncio-redis/blob/pubsub/tests.py#L1918

bytes as parameter for python 3

When I was using redis on Python 3, I have to pass bytes as value to connection.set. I think this is reasonable.
So when I found asyncio-redis requires a str as value, I think it's buggy because in such case, I have to transform a binary sequence into readable string before I can pass it to redis which will increase the length of data.

Sorry for the wrong issue, I just found it support encoders.

Python 2 Trollius support

I've modified asyncio-redis to run on Python 2 using Trollius. You can see my fork branch trollius_compat

I like this adaptor, and despite redoing/omitting sections (type checking), I believe it (as a separate package) may help others move to an AsyncIO/Trollius world.

I don't expect you to pull in my branch (suggest you don't). Let's use this issue as a discussion point to see whats next (it passes the examples).

Problems with setup.py

Try as I might, I cannot find the author's contact information anywhere. The "author" and "author_email" fields in setup.py are missing. There are several other problems with setup.py too:

  • wrong use of the "license" field
  • no classifiers have been defined
  • typo: extra_require should be extras_require (see the setuptools documentation here)
  • setup.py is not PEP 8 compliant

I suggest fixing these issues and running the pyroma tool to check if it's fine.

Default arguments are not encoded correctly.

We store serialized protocol buffers in Redis, thus we need to use the asyncio_redis.encoders.BytesEncoder. I have been using the last hour goung through the asyncio-redis code to understand why my sscan failed. I found out that scan with type annotations was defined as shown below and match has a default argument with the value '*',

def sscan(self, key:NativeType, match:NativeType='*') -> SetCursor:

This will result in the input_typecheck function throwing a TypeError. A quick fix is of cause just to overwrite the default argument match with b'*', but this should be done somewhere in the type checking system instead.

Blocking pop calls with timeouts raise AssertionError when the pop times out.

@asyncio.coroutine
def listen():
    # Create connection
    connection = yield from asyncio_redis.Connection.create(host='localhost', port=6379)

    # Inside a while loop, wait for incoming events.
    while True:
        reply = yield from connection.brpoplpush("task-queue", "in-progress", 2)
        print(reply)


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(listen())

$ python dummy_task_runner.py
Traceback (most recent call last):
File "dummy_task_runner.py", line 18, in
asyncio.get_event_loop().run_until_complete(listen())
File "/usr/local/lib/python3.4/asyncio/base_events.py", line 208, in run_until_complete
return future.result()
File "/usr/local/lib/python3.4/asyncio/futures.py", line 243, in result
raise self._exception
File "/usr/local/lib/python3.4/asyncio/tasks.py", line 283, in _step
result = next(coro)
File "dummy_task_runner.py", line 13, in listen
reply = yield from connection.brpoplpush("task-queue", "in-progress", 2)
File "/home/andre/Projects/pyws/env/lib/python3.4/site-packages/asyncio_redis/protocol.py", line 593, in wrapper
result = yield from post_process(protocol_self, result)
File "/usr/local/lib/python3.4/asyncio/tasks.py", line 84, in coro
res = func(_args, *_kw)
File "/home/andre/Projects/pyws/env/lib/python3.4/site-packages/asyncio_redis/protocol.py", line 314, in bytes_to_native
assert isinstance(result, bytes)
AssertionError

subscriber.next_published() seems to block in this code snippet

import asyncio
from aiohttp import web
import asyncio_redis


@asyncio.coroutine
def example():
    # Create connection
    connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)

    # Create subscriber.
    subscriber = yield from connection.start_subscribe()

    # Subscribe to channel.
    yield from subscriber.subscribe([ 'our-channel' ])

    # Inside a while loop, wait for incoming events.
    while True:
        reply = yield from subscriber.next_published()
        print('Received: ', repr(reply.value), 'on channel', reply.channel)

    # When finished, close the connection.
    connection.close()




@asyncio.coroutine
def init(loop):
    app = web.Application(loop=loop)

    yield from example()
    srv = yield from loop.create_server(app.make_handler(),
                                        '127.0.0.1', 8080)
    print("Server started at http://127.0.0.1:8080")
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

I may also be doing something wrong, i'm an asyncio newbie.

AssertionError when trying to hmset

This code:

import asyncio

import asyncio_redis
from asyncio_redis.encoders import BytesEncoder


@asyncio.coroutine
def test():
    conn = yield from asyncio_redis.Connection.create(host='localhost', port=6379, db=0,
                                                      encoder=BytesEncoder())
    d = dict(a=1, b=2)
    yield from conn.hmset(b'foo', d)
    conn.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(test())

throws:

Traceback (most recent call last):
  File "test.py", line 17, in <module>
    loop.run_until_complete(test())
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/base_events.py", line 268, in run_until_complete
    return future.result()
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/futures.py", line 277, in result
    raise self._exception
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py", line 236, in _step
    result = next(coro)
  File "test.py", line 12, in test
    yield from conn.hmset(b'foo', d)
  File "/Users/kimvais/py34/lib/python3.4/site-packages/asyncio_redis/protocol.py", line 655, in wrapper
    result = yield from method(protocol_self, *a, **kw)
  File "/Users/kimvais/py34/lib/python3.4/site-packages/asyncio_redis/protocol.py", line 1692, in hmset
    assert isinstance(k, self.native_type)
AssertionError
Task was destroyed but it is pending!
task: <Task pending coro=<_reader_coroutine() running at /Users/kimvais/py34/lib/python3.4/site-packages/asyncio_redis/protocol.py:919> wait_for=<Future pending cb=[Task._wakeup()]>>

pip jessie install report

mah@jessie64:~/machinekit/src$ sudo pip install asyncio_redis
Downloading/unpacking asyncio-redis
Downloading asyncio_redis-0.14.1.tar.gz
Running setup.py (path:/tmp/pip-build-qolQ5c/asyncio-redis/setup.py) egg_info for package asyncio-redis
/usr/lib/python2.7/distutils/dist.py:267: UserWarning: Unknown distribution option: 'extra_require'
warnings.warn(msg)

Downloading/unpacking asyncio (from asyncio-redis)
Downloading asyncio-3.4.3.tar.gz (204kB): 204kB downloaded
Running setup.py (path:/tmp/pip-build-qolQ5c/asyncio/setup.py) egg_info for package asyncio

Installing collected packages: asyncio-redis, asyncio
Running setup.py install for asyncio-redis
/usr/lib/python2.7/distutils/dist.py:267: UserWarning: Unknown distribution option: 'extra_require'
warnings.warn(msg)
File "/usr/local/lib/python2.7/dist-packages/asyncio_redis/protocol.py", line 270
result = yield from original_post_processor(protocol, result)
^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio_redis/connection.py", line 23
    def create(cls, host='localhost', port=6379, *, password=None, db=0,
                                                  ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio_redis/replies.py", line 59
    key, value = yield from f
                          ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio_redis/cursors.py", line 37
    chunk = yield from self._scanfunc(self._cursor, self.count)
                     ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio_redis/pool.py", line 27
    def create(cls, host='localhost', port=6379, *, password=None, db=0,
                                                  ^
SyntaxError: invalid syntax

Running setup.py install for asyncio
File "/usr/local/lib/python2.7/dist-packages/asyncio/windows_events.py", line 45
def init(self, ov, *, loop=None):
^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/futures.py", line 143
    def __init__(self, *, loop=None):
                        ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/proactor_events.py", line 386
    *, server_side=False, server_hostname=None,
     ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/base_events.py", line 177
    yield from waiter
             ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/locks.py", line 96
    def __init__(self, *, loop=None):
                        ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/events.py", line 282
    def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
                                       ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/unix_events.py", line 188
    yield from waiter
             ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/tasks.py", line 70
    def __init__(self, coro, *, loop=None):
                              ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/selectors.py", line 39
    "{!r}".format(fileobj)) from None
                               ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/test_utils.py", line 134
    def _run_test_server(*, address, use_ssl=False, server_cls, server_ssl_cls):
                          ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/queues.py", line 41
    def __init__(self, maxsize=0, *, loop=None):
                                   ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/windows_utils.py", line 83
    def pipe(*, duplex=False, overlapped=(True, True), bufsize=BUFSIZE):
              ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/selector_events.py", line 57
    def _make_socket_transport(self, sock, protocol, waiter=None, *,
                                                                   ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/subprocess.py", line 118
    return (yield from self._transport._wait())
                     ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/streams.py", line 39
    def open_connection(host=None, port=None, *,
                                               ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/base_subprocess.py", line 151
    _, pipe = yield from loop.connect_write_pipe(
                       ^
SyntaxError: invalid syntax

  File "/usr/local/lib/python2.7/dist-packages/asyncio/coroutines.py", line 46
    yield from gen
             ^
SyntaxError: invalid syntax

Successfully installed asyncio-redis asyncio
Cleaning up...

Connection loss does not raise an exception

When I shut down my redis server, then attempt to write, I get this on stdout:

 socket.send() raised exception.

I'm a bit confused, and maybe this is a asyncio issue (which is where the message comes from (https://github.com/sourcegraph/tulip/blob/master/tests/proactor_events_test.py#L154).

But afterwards, my coroutine some to be in some sort of undefined state, that is, it neither shuts down nor does it continue after the

yield from redis.get('key')

that caused the issue.

What I really want is for an exception to be raised so I can deal with it properly.

Raise UnicodeEncodeError when register a script that contain non-ascii chars.

There some non-ascii chars in my script, when call register_script, it raises UnicodeEncodeError:

  ...
  File "C:\Python35\lib\site-packages\asyncio_redis\pool.py", line 128, in register_script
    script = yield from self.__getattr__('register_script')(script)
  File "C:\Python35\lib\site-packages\asyncio_redis\protocol.py", line 656, in wrapper
    result = yield from method(protocol_self, *a, **kw)
  File "C:\Python35\lib\site-packages\asyncio_redis\protocol.py", line 1998, in register_script
    sha = yield from self.script_load(script)
  File "C:\Python35\lib\site-packages\asyncio_redis\protocol.py", line 656, in wrapper
    result = yield from method(protocol_self, *a, **kw)
  File "C:\Python35\lib\site-packages\asyncio_redis\protocol.py", line 2056, in script_load
    return self._query(b'script', b'load', script.encode('ascii'))
UnicodeEncodeError: 'ascii' codec can't encode characters in position 6-9: ordinal not in range(128)

I found the code cause this:

    @_query_command
    def script_load(self, script:str) -> str:
        """ Load script, returns sha1 """
        return self._query(b'script', b'load', script.encode('ascii'))

If I changed 'ascii' to 'utf-8', it works fine.

I think it is a bug. thanks.

Code that never works

Failed methods are:

RedisProtocol.move --
./asyncio_redis/protocol.py:1161: undefined name 'destination'
RedisProtocol._punsubscribe --
./asyncio_redis/protocol.py:1719: undefined name 'channels'

Doc improvements.

Comment from reddit:

http://www.reddit.com/r/Python/comments/20pj21/redis_client_library_for_asyncio_python_34/


Some thoughts about the documentation:
Most of the examples just define functions. It would be helpful to have fully-fleshed out examples, so interested users can actually see the library in action. The example page is where the front page directs users as a starting point, so make it beginner friendly. Better still, put one minimal complete example that actually does something interesting on the front page. (e.g., Insert a key/value pair, then fetch the value and print it.)
asyncio is brand-spanking new, so take a moment to explain what's going on. What are all the yield froms for? How does the event loop work? It doesn't need to be a complete tutorial--you can link to the official docs for that, but for a brand-new (to python) design pattern, a little hand-holding would not be unwarranted.
All in all, it looks pretty sweet. Thanks for getting some working asyncio code out in the wild so quickly. This is exactly the kind of thing needed to facilitate adoption.

Streaming of multi-bulk replies

Confirm that this is true:

redis = yield from Redis.create(port=6379, poolsize=2)
watches = yield from redis.smembers('account:{}'.format(account))
for f in watches:
    watch_id = yield from f
    watch_config = yield from redis.get('watch:{}'.format(watch_id))

If the set contains 500000 items, they will all be streamed into memory while the coroutine waits in the last line to resolve the redis.get(), because only one connection is used.

Pending task destroyed warning

When I run this script:

import asyncio
import asyncio_redis

loop = asyncio.get_event_loop()
conn = loop.run_until_complete(asyncio_redis.Connection.create('localhost', 6379))
conn.close()

I get warning:

Task was destroyed but it is pending!
task: <Task pending coro=<_reader_coroutine() running at /Users/honza/dev/envs/test/lib/python3.4/site-packages/asyncio_redis/protocol.py:919> wait_for=<Future pending cb=[Task._wakeup()]>>

Enviroment:

  • Python 3.4.2
  • asyncio-redis 0.13.4

Asyncio docs says it's probably a bug: https://docs.python.org/3/library/asyncio-dev.html#pending-task-destroyed

strange bug with start_subscribe

Hello!
Just got some strange bug with PUB/SUB code:
In coroutine I have:

pubsub = yield from redis.start_subscribe()
yield from pubsub.subscribe([channel])

In my case all stops after yielding from redis.start_subscribe() - reactor just don't return pubsub object. But! It only happens on second script launch (after __pycache__ dir filled with compiled code). And if I remove this directory, all works fine again until next launch.

As a workaround, I replaced first yield with:

try:
    redis.start_subscribe().__next__()
except StopIteration as e:
    pubsub = e.value

and all works without magic.

So, my proposal is to remove unnecessary @asyncio.coroutine decorator from start_subscribe and propose to use it like a synchronous method.

Sergey.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.