Code Monkey home page Code Monkey logo

aiomcache's Introduction

memcached client for asyncio

asyncio (PEP 3156) library to work with memcached.

Getting started

The API looks very similar to the other memcache clients:

import asyncio
import aiomcache

async def hello_aiomcache():
    mc = aiomcache.Client("127.0.0.1", 11211)
    await mc.set(b"some_key", b"Some value")
    value = await mc.get(b"some_key")
    print(value)
    values = await mc.multi_get(b"some_key", b"other_key")
    print(values)
    await mc.delete(b"another_key")

asyncio.run(hello_aiomcache())

Version 0.8 introduces FlagClient which allows registering callbacks to set or process flags. See examples/simple_with_flag_handler.py

aiomcache's People

Contributors

alefteris avatar argaen avatar artemismagilov avatar asvetlov avatar dependabot-preview[bot] avatar dependabot[bot] avatar dreamsorcerer avatar dringsim avatar fafhrd91 avatar grant-aterlo avatar jayzfbn avatar jettify avatar maartendraijer avatar magnuswatn avatar pwntester avatar pyup-bot avatar thehesiod avatar youknowone avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aiomcache's Issues

Store zlib compressed value

I'm trying to set key with zlib compressed value with python3

resp = 'some message'
res = zlib.compress(resp.encode())

mc = aiomcache.Client("slivei_memcached", 11211)
await mc.set(key, res.decode(), 60)
await mc.close()`

and got an error:
'utf-8' codec can't decode byte 0x9c in position 1: invalid start byte

Does anyone know how to solve this problem?

Cannot await stats()

I am not sure why, but on Python 3.7 the following code fails:

client = aiomcache.Client("0.0.0.0", 11211)
asyncio.get_event_loop().run_until_complete(client.stats())
# but this works: asyncio.get_event_loop().run_until_complete(client.get(b"0"))
/usr/lib/python3.7/asyncio/base_events.py in run_until_complete(self, future)
    577             raise RuntimeError('Event loop stopped before Future completed.')
    578 
--> 579         return future.result()
    580 
    581     def stop(self):

/usr/local/lib/python3.7/dist-packages/aiomcache/client.py in wrapper(self, *args, **kwargs)
     18         conn = yield from self._pool.acquire()
     19         try:
---> 20             return (yield from func(self, conn, *args, **kwargs))
     21         except Exception as exc:
     22             conn[0].set_exception(exc)

/usr/local/lib/python3.7/dist-packages/aiomcache/client.py in stats(self, conn, args)
    187         result = {}
    188 
--> 189         resp = yield from conn.reader.readline()
    190         while resp != b'END\r\n':
    191             terms = resp.split()

TypeError: cannot 'yield from' a coroutine object in a non-coroutine generator

param `max-line-length = 90` not working

i try linting code.
run

flake8 .

my code have line length ~100, but flake8 command no show this error.
i try change max-doc-length = 40 and this changes was raise error nut not max-line-length = 40
Need fix this bug

Many simultaneous requests create unbounded connections

On the pool maxsize is the limit on kept connections, however unlimited connections will be spawned as needed if there are no connections to grab from the pool.

This can cause too many connections to be created for memcache. Is this behavior on maxsize intended? Does it make sense to have a hard limit parameter?

connection does not close

Hi.
I'm trying to write a key with a value in the memcached server but the client connection is not closed. I use this functions in aiohttp.web.Application.
`

async def mc_write(key, an):
    mc = aiomcache.Client("memcached", 11211)
    await mc.set(key.encode(encoding='UTF-8'), an.encode(encoding='UTF-8'))
    mc.close()

async def Handler(body, headers, key, url):
    logging.info("Start processing key: {}".format(key))

    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(url, data=body, headers=headers) as resp:
                an = await resp.text()
                end = time.time()
    except Exception as e:
        return e

#Send result to memcache
    await mc_write(key, an)`

On memcached server i see lots of connections:
``
lsof -p 1 | grep ESTABLISHED | wc -l

12
``
What I'm do wrong, how I can close connections?

Open connections as needed instead of all at once

The initial call to acquire a connection opens all connections up to the max size. For anyone requiring a larger connection pool this becomes a relatively slow operation. Depending on the efficiency of your application you may not need to have the max number of connections open anyways. For those reasons I believe connections should be opened as needed until the max number of connections are reached.

https://github.com/aio-libs/aiomcache/blob/master/aiomcache/pool.py#L40-L44

IndexError in get()

We are using the latest release. Caught the following in Sentry in production:
image
image

We are running memcached on preemptible nodes in GCP and I suspect that the server died during get().

Closing Client leaks dict

Disclaimer: I'm new to python and asyncio so this may just be my own mis-use.

I've written some code to integrate with the auto-discovery feature of AWS ElastiCache. Part of this is connecting to a memcached cluster address every 60 seconds (it is important to re-connect each time so we resolve the DNS and ensure we get to a healthy cluster member). Everything is working find but it seems this process of frequently connecting / disconnecting is leaking dict's.

Here is a minimal reproducer using pympler to demonstrate the leak:

from pympler import muppy, summary
import asyncio
import aiomcache

loop = asyncio.get_event_loop()

async def hello_aiomcache():
    mc = aiomcache.Client("127.0.0.1", 11211, loop=loop)
    await mc.set(b"some_key", b"Some value")
    value = await mc.get(b"some_key")
    print(value)
    values = await mc.multi_get(b"some_key", b"other_key")
    print(values)
    await mc.delete(b"another_key")
    mc.close()  

# establish a baseline (watch the <class 'dict line)
summary.print_(summary.summarize(muppy.get_objects()))

for i in range(50):
    loop.run_until_complete(hello_aiomcache())

# <class 'dict grows
summary.print_(summary.summarize(muppy.get_objects()))

ds = [ao for ao in muppy.get_objects() if isinstance(ao, dict)]

# leaked dict looks like {'_loop': <_UnixSelectorEventLoop running=False closed=False debug=False>, '_paused': False, '_drain_waiter': None, '_connection_lost': False, '_stream_reader': <StreamReader t=<_SelectorSocketTransport fd=34 read=polling write=<idle, bufsize=0>>>, '_stream_writer': None, '_client_connected_cb': None, '_over_ssl': False}
ds[2364]

It looks like these dict's will hang around forever until loop.close() is called. I'm confused by this. I think I don't want to ever close the loop that I borrowed from tornado via tornado.ioloop.IOLoop.current().asyncio_loop. Is there any other way to properly close / cleanup these connections without closing the loop?

Client pool connections not re-used

I'm experiencing a resource leak when using this library. Here is an example of how to reproduce the condition.

import asyncio
import aiomcache

loop = asyncio.get_event_loop()
client = aiomcache.Client('127.0.0.1', 11211, pool_size=2)

@asyncio.coroutine
def get(c, k):
    yield from c.get(k)

for key in (b'non-existent-1', b'non-existent-2', b'non-existent-3', b'non-existent-4'):
    loop.run_until_complete(get(client,key))

Here is sample memcached output using the above test. You can see the client initiates a pool of 2 connections, and then proceeds to open a new connection for each get() call, instead of reusing the connections in the pool.

<17 server listening (auto-negotiate)
<18 server listening (auto-negotiate)
<19 send buffer was 9216, now 7456540
<20 send buffer was 9216, now 7456540
<19 server listening (udp)
<19 server listening (udp)
<19 server listening (udp)
<20 server listening (udp)
<19 server listening (udp)
<20 server listening (udp)
<20 server listening (udp)
<20 server listening (udp)
<21 new auto-negotiating client connection
<22 new auto-negotiating client connection
<23 new auto-negotiating client connection
23: Client using the ascii protocol
<23 get non-existent-1
>23 END
<24 new auto-negotiating client connection
24: Client using the ascii protocol
<24 get non-existent-2
>24 END
<25 new auto-negotiating client connection
25: Client using the ascii protocol
<25 get non-existent-3
>25 END
<26 new auto-negotiating client connection
26: Client using the ascii protocol
<26 get non-existent-4
>26 END

The following seems to clear the issue.

diff --git a/aiomcache/pool.py b/aiomcache/pool.py
index 98295a7..332bc7f 100644
--- a/aiomcache/pool.py
+++ b/aiomcache/pool.py
@@ -47,6 +47,8 @@ class MemcachePool:
                 if _conn.reader.at_eof() or _conn.reader.exception():
                     self._do_close(_conn)
                     conn = None
+                else:
+                    conn = _conn

             if conn is None:
                 conn = yield from self._create_new_conn()

Example on using aiomcache for a cluster

Thanks for building this amazing lib!
I was wondering if you can share an example for using aiomcache to connect to a memcached cluster with more than one host?

Seems like this is how connection are established and I don't see any other source code for connection management for multi-node cluster. Given how widely memcached clusters are used, I think an example would be really helpful

Please make it possible to run tests without Docker

We'd like to package aiomcache for Gentoo. As part of the packaging process, we need to be able to run the test suite. While we can reasonably easily spawn a local memcached instance for testing, we can't use Docker for testing (both because it can't work without direct Internet access and because it poses security issues).

Unfortunately, the test suite seems to be really tightly tied to using a Docker container. Could you please make it possible to use an externally running memcached server instead?

Add python 3.5 support

At the moment you cannot use the client with await python 3.5 syntax (at least not in the pypi version). The error that is returned is:

TypeError: object generator can't be used in 'await' expression

I think this can be fixed by decorating all of the coroutines with asyncio.coroutine. This is done in most places but not in the aiomcache.client.acquire decorator.

There might also be other places which mean it is not compatible with Python 3.5. I have "fixed" this in my code with:

    self.client = aiomcache.Client(
            host,
            port,
            pool_size=pool_size,
            pool_minsize=pool_minsize,
            loop=loop
        )
        self.client.set = asyncio.coroutine(self.client.set)
        self.client.get = asyncio.coroutine(self.client.get)
        self.client.multi_get = asyncio.coroutine(self.client.multi_get)

Which isn't too pretty.

It would also be nice to create a new pypi version to add in the other cool things you guys have added since last year.

Next release

@fafhrd91 please make next aiomcache release or give me access to PyPI for the project
#8 is worth to be published from my opinion

aiomcache not support chinese bytes as a key

I want use chinese bytes as a key:

In [1]: import asyncio 
   ...: import aiomcache 
   ...:  
   ...: loop = asyncio.get_event_loop()                                                                                                                                                                                                                                             

In [2]: async def hello_aiomcache(): 
   ...:     mc = aiomcache.Client('127.0.0.1', 11211, loop=loop) 
   ...:     await mc.set(bytes('中文', 'utf-8'), b'123') 
   ...:                                                                                                            

In [3]: loop.run_until_complete(hello_aiomcache())                                                                 
---------------------------------------------------------------------------
ValidationException                       Traceback (most recent call last)
<ipython-input-5-adb51212a606> in <module>
----> 1 loop.run_until_complete(hello_aiomcache())

/usr/local/Cellar/python3/3.6.4_2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py in run_until_complete(self, future)
    465             raise RuntimeError('Event loop stopped before Future completed.')
    466 
--> 467         return future.result()
    468 
    469     def stop(self):

<ipython-input-4-d36e59b6000e> in hello_aiomcache()
      1 async def hello_aiomcache():
      2     mc = aiomcache.Client('127.0.0.1', 11211, loop=loop)
----> 3     await mc.set(bytes('中文', 'utf-8'), b'123')
      4 

~/.virtualenvs/lyanna-tp9VnnDu/lib/python3.6/site-packages/aiomcache/client.py in wrapper(self, *args, **kwargs)
     18         conn = yield from self._pool.acquire()
     19         try:
---> 20             return (yield from func(self, conn, *args, **kwargs))
     21         except Exception as exc:
     22             conn[0].set_exception(exc)

~/.virtualenvs/lyanna-tp9VnnDu/lib/python3.6/site-packages/aiomcache/client.py in set(self, conn, key, value, exptime)
    250         flags = 0  # TODO: fix when exception removed
    251         resp = yield from self._storage_command(
--> 252             conn, b'set', key, value, flags, exptime)
    253         return resp
    254 

~/.virtualenvs/lyanna-tp9VnnDu/lib/python3.6/site-packages/aiomcache/client.py in _storage_command(self, conn, command, key, value, flags, exptime, cas)
    218         # however custom-compiled memcached can have different limit
    219         # so, we'll let the server decide what's too much
--> 220         assert self._validate_key(key)
    221 
    222         if not isinstance(exptime, int):

~/.virtualenvs/lyanna-tp9VnnDu/lib/python3.6/site-packages/aiomcache/client.py in _validate_key(self, key)
     54                 raise ValidationException('trailing newline', key)
     55         else:
---> 56             raise ValidationException('invalid key', key)
     57 
     58         return key

ValidationException: invalid key: b'\xe4\xb8\xad\xe6\x96\x87'

In [4]: debug                                                                                                      
> /Users/dongweiming/.virtualenvs/lyanna-tp9VnnDu/lib/python3.6/site-packages/aiomcache/client.py(56)_validate_key()
     54                 raise ValidationException('trailing newline', key)
     55         else:
---> 56             raise ValidationException('invalid key', key)
     57 
     58         return key

ipdb> key                                                                                                          
b'\xe4\xb8\xad\xe6\x96\x87'

UNIX Socket Support

It would be nice if this library could also connect to memcached via UNIX sockets, that way it I do not need it listening on an IP address.

Thanks!

Ryan

[feature] add timeout support

would be really nice to have conn/read timeouts such that they do the right thing with the connector when they're hit

Too many open files

I ran into a problem: large number of sets causes error.

Jan 21 21:23:12 max systemd-memcached-wrapper[722]: accept4(): Too many open files

Here is the sample conde:

# -*- coding: utf-8 -*-

import asyncio

import aiomcache


@asyncio.coroutine
def store(key, val):
    mc = aiomcache.Client('127.0.0.1', 11211)
    yield from mc.set(key.encode(), val.encode())
    yield from mc.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    for i in range(10000):
        key = str(i)
        val = str(i)
        loop.run_until_complete(store(key, val))

This is similar to an issue #3

Unexpected CancelledError

I get errors coming from the aiomcache connection and I'm honestly not sure what the issue is. The aiohttp server sets up a connection when the app starts up:

# app.py
app = web.Application()
app.on_startup.append(tasks.start_memcached)
app.on_cleanup.append(tasks.stop_memcached)
# tasks.py
async def start_memcached(app):
    app['memcached'] = aiomcache.Client(host, port, loop=app.loop)

async def stop_memcached(app):
    app['memcached'].cancel()

Views that use the Client() instance will sometimes fail. I have a /health endpoint that is being checked every 30 seconds or so:

# views/health.py
async def health(request):
    is_healthy = False
    try:
        await request.app['memcached'].set(key, token)
        cache_response = await request.app['memcached'].get(key)
    except Exception:
        logger.exception('cannot connect to memcache')
    else:
        is_healthy = cache_response == token

    return web.json_response({'healthy': is_healthy})

The other requests that fail have the same traceback, but this is specifically the one from the health endpoint:

Traceback (most recent call last):
  File "/app/src/views/health.py", line 35, in health
    await cache.set(key, token)
  File "/usr/local/lib/python3.6/site-packages/aiomcache/client.py", line 20, in wrapper
    return (yield from func(self, conn, *args, **kwargs))
  File "/usr/local/lib/python3.6/site-packages/aiomcache/client.py", line 252, in set
    conn, b'set', key, value, flags, exptime)
  File "/usr/local/lib/python3.6/site-packages/aiomcache/client.py", line 232, in _storage_command
    resp = yield from self._execute_simple_command(conn, cmd)
  File "/usr/local/lib/python3.6/site-packages/aiomcache/client.py", line 68, in _execute_simple_command
    line = yield from conn.reader.readline()
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 488, in readline
    line = yield from self.readuntil(sep)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 581, in readuntil
    yield from self._wait_for_data('readuntil')
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
    yield from self._waiter
concurrent.futures._base.CancelledError

There is nothing obvious in the memcached logs.

MANIFEST.in with CHANGES.txt

Hey congrats for the project, really interesting.

I'm having problems installing with source code, it would be possible to have a release with a MANIFEST.in that includes the CHANGES.txt and README.rst ? Its complaining that is missing it as its pointing on the setup.py and not released on the source package!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.