Code Monkey home page Code Monkey logo

aiozk's Introduction

Asyncio zookeeper client (aiozk)

PyPi version

Build Status

Table of Contents

Status

Have no major bugs in client/session/connection, but recipes need more test code to become more robust.

Any help and interest are welcome ๐Ÿ˜€

Installation

$ pip install aiozk

Quick Example

import asyncio
from aiozk import ZKClient


async def main():
    zk = ZKClient('localhost')
    await zk.start()
    await zk.create('/foo', data=b'bazz', ephemeral=True)
    assert b'bazz' == await zk.get_data('/foo')
    await zk.close()

asyncio.run(main())

Recipes

You may use recipes, similar to zoonado, kazoo, and other libs:

# assuming zk is aiozk.ZKClient

# Lock
async with await zk.recipes.Lock('/path/to/lock').acquire():
    # ... Do some stuff ...
    pass

# Barrier
barrier = zk.recipes.Barrier('/path/to/barrier)
await barrier.create()
await barrier.lift()
await barrier.wait()

# DoubleBarrier
double_barrier = zk.recipes.DoubleBarrier('/path/to/double/barrier', min_participants=4)
await double_barrier.enter(timeout=0.5)
# ...  Do some stuff ...
await double_barrier.leave(timeout=0.5)

You can find full list of recipes provided by aiozk here: aiozk recipes

To understand ideas behind recipes please read this and even more recipes here. Make sure you're familiar with all recipes before doing something new by yourself, especially when it involves more than few zookeeper calls.

Caution

Don't mix different type of recipes at the same znode path. For example, creating a Lock and a DoubleBarrier object at the same path. It may cause undefined behavior ๐Ÿ˜“

Testing

NB: please ensure that you're using recent docker-compose version. You can get it by running

pip install --user -U docker-compose

Run tests

# you should have access to docker

docker-compose build
./test-runner.sh

Or you can run tests with tox

pip install --user tox tox-docker
tox

Testing approach

Most of tests are integration tests and running on real zookeeper instances. We've chosen zookeeper 3.5 version since it has an ability to dynamic reconfiguration and we're going to do all connecting/reconnecting/watches tests on zk docker cluster as this gives us the ability to restart any server and see what happens.

# first terminal: launch zookeeper cluster
docker-compose rm -fv && docker-compose build zk && docker-compose up --scale zk=7 zk_seed zk

# it will launch cluster in this terminal and remain. last lines should be like this:

zk_6       | Servers: 'server.1=172.23.0.9:2888:3888:participant;0.0.0.0:2181\nserver.2=172.23.0.2:2888:3888:participant;0.0.0.0:2181\nserver.3=172.23.0.3:2888:3888:participant;0.0.0.0:2181\nserver.4=172.23.0.4:2888:3888:participant;0.0.0.0:2181\nserver.5=172.23.0.5:2888:3888:participant;0.0.0.0:2181\nserver.6=172.23.0.7:2888:3888:participant;0.0.0.0:2181'
zk_6       | CONFIG: server.1=172.23.0.9:2888:3888:participant;0.0.0.0:2181
zk_6       | server.2=172.23.0.2:2888:3888:participant;0.0.0.0:2181
zk_6       | server.3=172.23.0.3:2888:3888:participant;0.0.0.0:2181
zk_6       | server.4=172.23.0.4:2888:3888:participant;0.0.0.0:2181
zk_6       | server.5=172.23.0.5:2888:3888:participant;0.0.0.0:2181
zk_6       | server.6=172.23.0.7:2888:3888:participant;0.0.0.0:2181
zk_6       | server.7=172.23.0.6:2888:3888:observer;0.0.0.0:2181
zk_6       |
zk_6       |
zk_6       | Reconfiguring...
zk_6       | ethernal loop
zk_7       | Servers: 'server.1=172.23.0.9:2888:3888:participant;0.0.0.0:2181\nserver.2=172.23.0.2:2888:3888:participant;0.0.0.0:2181\nserver.3=172.23.0.3:2888:3888:participant;0.0.0.0:2181\nserver.4=172.23.0.4:2888:3888:participant;0.0.0.0:2181\nserver.5=172.23.0.5:2888:3888:participant;0.0.0.0:2181\nserver.6=172.23.0.7:2888:3888:participant;0.0.0.0:2181\nserver.7=172.23.0.6:2888:3888:participant;0.0.0.0:2181'
zk_7       | CONFIG: server.1=172.23.0.9:2888:3888:participant;0.0.0.0:2181
zk_7       | server.2=172.23.0.2:2888:3888:participant;0.0.0.0:2181
zk_7       | server.3=172.23.0.3:2888:3888:participant;0.0.0.0:2181
zk_7       | server.4=172.23.0.4:2888:3888:participant;0.0.0.0:2181
zk_7       | server.5=172.23.0.5:2888:3888:participant;0.0.0.0:2181
zk_7       | server.6=172.23.0.7:2888:3888:participant;0.0.0.0:2181
zk_7       | server.7=172.23.0.6:2888:3888:participant;0.0.0.0:2181
zk_7       | server.8=172.23.0.8:2888:3888:observer;0.0.0.0:2181
zk_7       |
zk_7       |
zk_7       | Reconfiguring...
zk_7       | ethernal loop

Run tests in docker:

docker-compose run --no-deps aiozk
# last lines will be about testing results

............lot of lines ommited........
.
----------------------------------------------------------------------
Ran 3 tests in 1.059s

OK

Run tests locally:

# ZK_IP can be something from logs above, like: ZK_HOST=172.21.0.6:2181
ZK_HOST=<ZK_IP> ./venv/bin/pytest

Recipes testing

It seems that usually recipes require several things to be tested:

  • That recipe flow is working as expected
  • Timeouts: reproduce every timeout with meaningful values (timeout 0.5s and block for 0.6s)

Run some tests directly

Another way to run tests only which you are interested in quickly. Or this is useful when you run tests under the other version of python.

# Run zookeeper container
docker run -p 2181:2181 zookeeper

# Run pytest directly at the development source tree
export ZK_HOST=localhost
pytest -s --log-cli-level=DEBUG aiozk/test/test_barrier.py

References

aiozk's People

Contributors

bfoder avatar brutasse avatar cybergrind avatar decaz avatar kammala avatar livercat avatar nikitagromov avatar packysauce avatar rhdxmr avatar soniccube avatar sy-be avatar websuslik avatar xose avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

aiozk's Issues

Support Python 3.10

Python 3.10 was released and all accumulated improvements should be released so aiozk will be compatible with the new Python version.

InvalidStateError at not awaited task connection.close

Looks like there is 2 problems:

  1. InvalidStateError at connection.close
  2. Not awaited tasks at https://github.com/micro-fan/aiozk/blob/master/aiozk/session.py#L124, https://github.com/micro-fan/aiozk/blob/master/aiozk/session.py#L127
states.py:50 - transition_to() DEBUG --- Session transition: connected -> suspended
session.py:179 - repair_loop() DEBUG --- Repair state reached
session.py:109 - find_server() INFO --- Connecting to localhost:2182
connection.py:75 - connect() DEBUG --- Initial connection to server localhost:2182
connection.py:57 - _make_handshake() DEBUG --- Sending 'srvr' command to localhost:2182
connection.py:69 - _make_handshake() DEBUG --- Version info: (3, 6, 1)
connection.py:70 - _make_handshake() DEBUG --- Read-only mode: False
connection.py:72 - _make_handshake() DEBUG --- Actual connection to server localhost:2182
session.py:113 - find_server() INFO --- Connected to localhost:2182
session.py:123 - find_server() DEBUG --- Close old connection
base_events.py:1619 - default_exception_handler() ERROR --- Task exception was never retrieved
future: <Task finished coro=<Connection.close() done, defined at /Users/bkhasanov/Envs/ipv4mgr/lib/python3.7/site-packages/aiozk/connection.py:256> exception=InvalidStateError('invalid state')>
Traceback (most recent call last):
  File "/Users/bkhasanov/Envs/ipv4mgr/lib/python3.7/site-packages/aiozk/connection.py", line 264, in close
    await self.read_loop_task
  File "/Users/bkhasanov/Envs/ipv4mgr/lib/python3.7/site-packages/aiozk/connection.py", line 170, in read_loop
    f.set_exception(response)
asyncio.base_futures.InvalidStateError: invalid state

Add auth data for aiozk

Can you add auth_data when ZKClient instance like KazooClient(auth_data=auth_data)?
Or that would work not good?

For example:

class MyZKClient(ZKClient):

    def __init__(
            self,
            servers,
            chroot=None,
            session_timeout=10,
            default_acl=None,
            retry_policy=None,
            allow_read_only=False,
            read_timeout=None,
            loop=None,
            auth_data=None,
    ):
        super().__init__(servers, chroot, session_timeout, default_acl,
                         retry_policy, allow_read_only, read_timeout,
                         loop)
        self.auth_data = auth_data

    async def send(self, request):
        try:
            return await super().send(request)

        except NoAuth:

            if self.auth_data:
                await super().send(self.auth_data)

                return await super().send(request)


auth_data = AuthRequest(type=0, scheme='digest', auth=auth)
MyZKClient(zk_hosts, auth_data=auth_data)

but, still has some NoAuth error when use TreeCache

zookeeper: 3.4.10

Pending tasks after client close

After interrupting of process I'm getting pending tasks error. Is it a bug or I have to change handling of client close?

test_zk.py:

import asyncio
import signal

from aiozk import ZKClient

loop = asyncio.get_event_loop()
zk = ZKClient('localhost')


async def run():
    await zk.start()


async def stop_handler():
    await zk.close()
    loop.stop()


loop.add_signal_handler(
    signal.SIGINT, lambda: asyncio.ensure_future(stop_handler()))
loop.run_until_complete(run())
try:
    loop.run_forever()
finally:
    loop.close()

Log:

$ python test_zk.py
^CAborting connection to localhost:2181
Task was destroyed but it is pending!
task: <Task pending coro=<Session.repair_loop() running at /home/decaz/.virtualenvs/test/lib/python3.6/site-packages/aiozk/session.py:150> wait_for=<Future finished result=None>>
Task was destroyed but it is pending!
task: <Task pending coro=<Connection.read_loop() running at /home/decaz/.virtualenvs/test/lib/python3.6/site-packages/aiozk/connection.py:154> wait_for=<Future cancelled>>

Exception is raised on session close

#28 introduced a little bug - now asyncio.CancelledError exception is raised on session close.
Here the task is cancelled:
https://github.com/tipsi/aiozk/blob/341b337a2fc239788fc142eba3633868bd3a8bf1/aiozk/session.py#L311
And here its exception is being catched:
https://github.com/tipsi/aiozk/blob/341b337a2fc239788fc142eba3633868bd3a8bf1/aiozk/session.py#L87
As we know future.exception() raises asyncio.CancelledError if the future has been cancelled so finally we're getting exception raised.

FYI @cybergrind @nikitagromov

Connection issues handling

Part of example client:

while True:
    children = await client.get_children(path, watch=True)
    ...
    await client.wait_for_events([WatchEvent.CHILDREN_CHANGED], path)
  1. Client 1 lost connection to the server
  2. Client 2 added new children to the path
  3. Client 1 repaired connection

In this case Client 1 doesn't get event so he won't continue the loop to the next iteration to update children list. Is there any proper way to handle such connection problems?

AIOZK connection timeout when uploading data from multiple files longer than 460 lines

Description

AIOZK transaction using AIOZK client causes the following timeout exception after approx. 30 seconds, when there are multiple zookeeper nodes (i. e. files) committed with at least one larger than 450 lines separated by \n. The setting of both session and read timeouts in the AIOZK client has no impact.

Exception

Client:

30-Mar-2022 08:21:52.183376 ERROR aiozk.session Send exception: ('lm1', 12181)
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/aiozk/session.py", line 217, in send
    zxid, response = await self.conn.send(request, xid=self.xid)
aiozk.exc.TimeoutError: ('lm1', 12181)
...
  File "/opt/lmiocmd/commander/library/service.py", line 228, in load_library
    await transaction.commit()
  File "/usr/local/lib/python3.7/site-packages/aiozk/transaction.py", line 117, in commit
    response = await self.client.send(self.request)
  File "/usr/local/lib/python3.7/site-packages/aiozk/client.py", line 101, in send
    response = await self.session.send(request)
  File "/usr/local/lib/python3.7/site-packages/aiozk/session.py", line 233, in send
    raise e
  File "/usr/local/lib/python3.7/site-packages/aiozk/session.py", line 217, in send
    zxid, response = await self.conn.send(request, xid=self.xid)
aiozk.exc.TimeoutError: ('lm1', 12181)

ZooKeeper Server Logs:

[2022-03-30 08:21:39,824] INFO Processing srvr command from /172.22.0.13:53912 (org.apache.zookeeper.server.NIOServerCnxn)
[2022-03-30 08:21:42,521] WARN Exception causing close of session 0x100d77f221515ee: Len error 1093084 (org.apache.zookeeper.server.NIOServerCnxn)
[2022-03-30 08:21:52,155] INFO Processing srvr command from /172.22.0.13:54014 (org.apache.zookeeper.server.NIOServerCnxn)
[2022-03-30 08:21:52,166] INFO Revalidating client: 0x100d77f221515ee (org.apache.zookeeper.server.quorum.Learner)
[2022-03-30 08:21:52,285] WARN Exception causing close of session 0x100d77f221515ee: Connection reset by peer (org.apache.zookeeper.server.NIOServerCnxn)

How to reproduce

Use aiozk transaction in a following way to upload data read from files (FILE_DATA and FILE_PATH):

zookeeper_path = "/{}".format(FILE_PATH)
transaction = aiozk.transaction.Transaction(client_object)
transaction.create(zookeeper_path, FILE_DATA)
...

There should be at least 660 files with at least one exceeding approx. 460 lines added to to the transaction.

Afterwards the transaction is to be committed:

await transaction.commit()

The exception is caused by this await transaction.commit() line.

Environment

ZooKeeper Cluster with 3 nodes running in Docker Container:

  zookeeper:
    restart: on-failure:3
    image: confluentinc/cp-zookeeper:5.5.1
    network_mode: host
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_MAX_CLIENT_CNXNS: 0
      ZOOKEEPER_SERVERS: lm1:12888:13888;lm2:22888:23888;lm3:32888:33888
    volumes:
      - /data/ssd/zookeeper/data:/var/lib/zookeeper/data
      - /data/ssd/zookeeper/logs:/var/lib/zookeeper/log

Memory:

MemTotal:       264009620 kB
MemFree:         4479308 kB
MemAvailable:   37704100 kB

CPU:

64 cores

processor	: 63
vendor_id	: AuthenticAMD
cpu family	: 23
model		: 1
model name	: AMD EPYC 7551P 32-Core Processor
stepping	: 2
microcode	: 0x8001250
cpu MHz		: 2549.852
cache size	: 512 KB

Receiving data after node deletion

If we have data watcher on some node, data changes will not be received after deleting and creating node again.

This behaviour can be reproduced with slightly changed test from aiozk.test.test_watchers

@pytest.mark.asyncio
async def test_data_watch_delete(zk, path, data_watcher):
    data = []
    ready = asyncio.Event()
    test_data = b'test'

    async def data_callback(d):
        data.append(d)
        ready.set()

    await zk.set_data(path, test_data)

    data_watcher.add_callback(path, data_callback)
    await asyncio.sleep(0.2)
    assert data == [test_data]
    ready.clear()
    await zk.delete(path)

    await asyncio.wait_for(ready.wait(), timeout=1)
    assert ready.is_set()
    assert data == [test_data, NoNode]
    # data_watcher.remove_callback(path, data_callback)

    ready.clear()
    data = []
    await zk.create(path)
    await zk.set_data(path, test_data)
    await asyncio.sleep(0.2)
    assert data == [test_data]
    ready.clear()

Is this behaviour expected?
kazoo, for example, will received data in same situation.

struct.error: 'i' format requires -2147483648 <= number <= 2147483647

I don't know how this error happened, and restart my server can slove this error:

File "/usr/local/lib/python3.6/dist-packages/aiozk/client.py", line 103, in exists
await self.send(protocol.ExistsRequest(path=path, watch=watch))
File "/usr/local/lib/python3.6/dist-packages/aiozk/client.py", line 73, in send
response = await self.session.send(request)
File "/usr/local/lib/python3.6/dist-packages/aiozk/session.py", line 226, in send
raise e
File "/usr/local/lib/python3.6/dist-packages/aiozk/session.py", line 210, in send
zxid, response = await self.conn.send(request, xid=self.xid)
File "/usr/local/lib/python3.6/dist-packages/aiozk/connection.py", line 123, in send
payload = request.serialize(xid)
File "/usr/local/lib/python3.6/dist-packages/aiozk/protocol/request.py", line 41, in serialize
buff.write(struct.pack("!" + "".join(formats), *data))
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

fan_wait command not found

When I run docker-compose up, this message is printed and aiozk_diag_1 container exited.

diag_1     | bash: fan_wait: command not found
aiozk_diag_1 exited with code 127

Watcher recipes pass old data to callback

In DataWatcher and ChildrenWatcher, it will pass the changing data to callback as parameter. But the data pass into the callback is the old value, not the new value after changing.

Is it designing to act like this? In many other zk clients, like kazoo, this parameter will be the new value, I also think passing the new value to callback is better.

It seems like a mistake in BaseWatcher.

Thanks.

Is there some special way to create Locks?

I've tried creating and acquiring locks in several ways but none of them seems to work. Is there some special way to achieve that or they simply don't work?

I've checked documentation and it's not stated anywhere that they don't work but on the other hand I've found the following piece of code that suggests that they don't work:

# TODO: replace with aiozk.recipe.Lock after it fixed
class MYLock:
    def __init__(self, zk, name):
        self.zk = zk
        self.name = name

    async def __aenter__(self):
        while 1:
            try:
                await self.zk.create(self.name, ephemeral=True)
                break
            except Exception as e:
                await asyncio.sleep(1)
        return self

    async def __aexit__(self, _type, _exc, _tb):
        await self.zk.delete(self.name)

Remove loop argument

To remove warning at latest Python version and to be ready for the future Python versions: DeprecationWarning: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.

Add docstrings, API docs

First off, I'm really liking the library, so thanks for putting out there!

As I'm using it, I'm finding it a little hard to navigate because there aren't docs or docstrings, so I'm looking through the source and tests to figure out how a lot of things work and sort of guessing at things.

Would you accept PRs if I added docs to this project?

watchers and aio

seems like in the source code you can add watchers:

zk.session.add_watch_callback(3, '/my/favorite', my_func)

but apart from that the func needs to by sync instead of async, and isn't documented.

work in progress?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.