Code Monkey home page Code Monkey logo

Comments (4)

danielloader avatar danielloader commented on August 12, 2024

Had a fiddle and came up with:

@operator(pipable=True)
async def taketimeout(source, n, timeout=1):
    """Forward the first ``n`` elements from an asynchronous sequence.
    Additionally provide a ``timeout`` in seconds. Default is 1 second.

    If ``n`` is negative, it simply terminates before iterating the source.
    """
    event = asyncio.Event()
    async def timer(event: asyncio.Event, timeout):
        await asyncio.sleep(timeout)
        event.set()
    asyncio.create_task(timer(event, timeout))
    async for item in take(source, n):
        if event.is_set():
            break
        yield item

With this test:

@pytest.mark.asyncio
async def test_taketimeout(assert_run, event_loop):
    with event_loop.assert_cleanup():
        xs = stream.count(interval=1) | add_resource.pipe(1) | pipe.taketimeout(6, 3)
        await assert_run(xs, [0, 1, 2])

    with event_loop.assert_cleanup():
        xs = stream.count(1) | add_resource.pipe(1) | pipe.taketimeout(0)
        await assert_run(xs, [])

The good news is it passes the functionality test, given a take size of 6, but timeout of 3, it's pulling three items from the count generator.

The bad news is it's a bit janky, the Python 3.11 timeout context manager would be perfect for this, but obviously not applicable here. Additionally it passes the cleanup check failure and I'm not sure why.

from aiostream.

danielloader avatar danielloader commented on August 12, 2024

It might be more useful to make a timeout function a generic pipe operator that allows you to terminate any long running pipe by time rather than by item counts.

E.g.

import asyncio
from aiostream import stream, pipe


async def main():
    xs = (
        stream.count(interval=1)  # Count from zero every 1s from an unbound infinite generator
        | pipe.timeout(10) # Close the pipeline after 10 seconds yielding [0,1,2,3,4,5,6,7,8,9]
        | pipe.skip(2)  # Skip the first 2 numbers yielding [2,3,4,5,6,7,8,9]
        | pipe.take(5)  # Take the following 2 yielding [2,3,4,5,6]
        | pipe.filter(lambda x: x % 2)  # Keep odd numbers yielding [3,5]
        | pipe.map(lambda x: x ** 2)  # Square the results yielding [9,25]
        | pipe.accumulate()  # Add the numbers together yielding 34
    )
    print(await xs)


# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

Hi @danielloader, sorry for the delay

Found this library and want to say thanks, it's absolutely brilliant for my usecase of merging multiple AWS Kinesis streams into a single stream of async records.

Thanks for your kind words, it's much appreciated :)

I'd like to propose a timeout on chunks (or take).

That's an interesting suggestion ! It's quite tricky to implement though, after a bit fiddling around I came up with this solution:

import random
from contextlib import asynccontextmanager

import asyncio
from aiostream import stream, pipe, operator, streamcontext


@asynccontextmanager
async def buffer(streamer, size=1):
    queue = asyncio.Queue(maxsize=size)
    sentinel = object()

    async def consume():
        try:
            async for item in streamer:
                await queue.put(item)
        finally:
            await queue.put(sentinel)

    @operator
    async def wrapper():
        while True:
            item = await queue.get()
            if item is sentinel:
                await future
                return
            yield item

    future = asyncio.ensure_future(consume())
    try:
        yield wrapper()
    finally:
        future.cancel()


@operator(pipable=True)
async def catch(source, exc_cls):
    async with streamcontext(source) as streamer:
        try:
            async for item in streamer:
                yield item
        except exc_cls:
            return


@operator(pipable=True)
async def chunks(source, n, timeout):
    async with streamcontext(source) as streamer:
        async with buffer(streamer) as buffered:
            async with streamcontext(buffered) as first_streamer:
                async for first in first_streamer:
                    tail = await (
                        buffered
                        | pipe.timeout(timeout)
                        | catch.pipe(asyncio.TimeoutError)
                        | pipe.take(n - 1)
                        | pipe.list()
                    )
                    yield [first, *tail]


async def random_sleep(item):
    return await asyncio.sleep(random.random() * 0.15, result=item)


async def main():
    xs = stream.count() | pipe.map(random_sleep, task_limit=1)
    ys = xs | chunks.pipe(5, timeout=0.1) | pipe.print()
    await ys


if __name__ == "__main__":
    asyncio.run(main())

It outputs something like:

[0, 1]
[2, 3, 4, 5]
[6, 7, 8, 9]
[10, 11, 12, 13, 14]
[15, 16, 17]
[18, 19, 20, 21]
[22, 23, 24, 25]
[26, 27, 28]
[29, 30]
[31, 32, 33, 34, 35]
[36, 37]
[38, 39, 40, 41]
[42, 43]
[44, 45]
[46, 47, 48, 49, 50]
...

As you can see, I needed two extra helpers in order to write this new chunks implementation:

  • a catch operator, which is simple enough
  • a buffer context manager, which is something much trickier

Also note that this implementation yields a smaller chunk when the production of a single item times out, not when the production of the whole chunk times out.

I'm not sure how I would go about implementing such a feature. Should we add this logic specifically for chunks or should we take a more generic approach and design new operators to use as building blocks for similar use cases? Sadly I don't really have bandwidth to work on aiostream at the moment, but feel free to add a couple suggestions if you have some.

And thanks for the report :)

from aiostream.

zzl221000 avatar zzl221000 commented on August 12, 2024

@vxgmichel
The process does not stop until it is manually killed after the chunks operator is used.

async def main():
    source=stream.range(0,100)
    even=source|pipe.filter(lambda x:x%2==0)
    xs=even|chunks.pipe(9,0.1)|pipe.map(lambda x:f'even {x}',task_limit=1)
    await (xs|pipe.print())
if __name__ == "__main__":
    asyncio.run(main())

from aiostream.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.