Comments (4)
Had a fiddle and came up with:
@operator(pipable=True)
async def taketimeout(source, n, timeout=1):
"""Forward the first ``n`` elements from an asynchronous sequence.
Additionally provide a ``timeout`` in seconds. Default is 1 second.
If ``n`` is negative, it simply terminates before iterating the source.
"""
event = asyncio.Event()
async def timer(event: asyncio.Event, timeout):
await asyncio.sleep(timeout)
event.set()
asyncio.create_task(timer(event, timeout))
async for item in take(source, n):
if event.is_set():
break
yield item
With this test:
@pytest.mark.asyncio
async def test_taketimeout(assert_run, event_loop):
with event_loop.assert_cleanup():
xs = stream.count(interval=1) | add_resource.pipe(1) | pipe.taketimeout(6, 3)
await assert_run(xs, [0, 1, 2])
with event_loop.assert_cleanup():
xs = stream.count(1) | add_resource.pipe(1) | pipe.taketimeout(0)
await assert_run(xs, [])
The good news is it passes the functionality test, given a take size of 6, but timeout of 3, it's pulling three items from the count generator.
The bad news is it's a bit janky, the Python 3.11 timeout context manager would be perfect for this, but obviously not applicable here. Additionally it passes the cleanup check failure and I'm not sure why.
from aiostream.
It might be more useful to make a timeout function a generic pipe operator that allows you to terminate any long running pipe by time rather than by item counts.
E.g.
import asyncio
from aiostream import stream, pipe
async def main():
xs = (
stream.count(interval=1) # Count from zero every 1s from an unbound infinite generator
| pipe.timeout(10) # Close the pipeline after 10 seconds yielding [0,1,2,3,4,5,6,7,8,9]
| pipe.skip(2) # Skip the first 2 numbers yielding [2,3,4,5,6,7,8,9]
| pipe.take(5) # Take the following 2 yielding [2,3,4,5,6]
| pipe.filter(lambda x: x % 2) # Keep odd numbers yielding [3,5]
| pipe.map(lambda x: x ** 2) # Square the results yielding [9,25]
| pipe.accumulate() # Add the numbers together yielding 34
)
print(await xs)
# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
from aiostream.
Hi @danielloader, sorry for the delay
Found this library and want to say thanks, it's absolutely brilliant for my usecase of merging multiple AWS Kinesis streams into a single stream of async records.
Thanks for your kind words, it's much appreciated :)
I'd like to propose a timeout on chunks (or take).
That's an interesting suggestion ! It's quite tricky to implement though, after a bit fiddling around I came up with this solution:
import random
from contextlib import asynccontextmanager
import asyncio
from aiostream import stream, pipe, operator, streamcontext
@asynccontextmanager
async def buffer(streamer, size=1):
queue = asyncio.Queue(maxsize=size)
sentinel = object()
async def consume():
try:
async for item in streamer:
await queue.put(item)
finally:
await queue.put(sentinel)
@operator
async def wrapper():
while True:
item = await queue.get()
if item is sentinel:
await future
return
yield item
future = asyncio.ensure_future(consume())
try:
yield wrapper()
finally:
future.cancel()
@operator(pipable=True)
async def catch(source, exc_cls):
async with streamcontext(source) as streamer:
try:
async for item in streamer:
yield item
except exc_cls:
return
@operator(pipable=True)
async def chunks(source, n, timeout):
async with streamcontext(source) as streamer:
async with buffer(streamer) as buffered:
async with streamcontext(buffered) as first_streamer:
async for first in first_streamer:
tail = await (
buffered
| pipe.timeout(timeout)
| catch.pipe(asyncio.TimeoutError)
| pipe.take(n - 1)
| pipe.list()
)
yield [first, *tail]
async def random_sleep(item):
return await asyncio.sleep(random.random() * 0.15, result=item)
async def main():
xs = stream.count() | pipe.map(random_sleep, task_limit=1)
ys = xs | chunks.pipe(5, timeout=0.1) | pipe.print()
await ys
if __name__ == "__main__":
asyncio.run(main())
It outputs something like:
[0, 1]
[2, 3, 4, 5]
[6, 7, 8, 9]
[10, 11, 12, 13, 14]
[15, 16, 17]
[18, 19, 20, 21]
[22, 23, 24, 25]
[26, 27, 28]
[29, 30]
[31, 32, 33, 34, 35]
[36, 37]
[38, 39, 40, 41]
[42, 43]
[44, 45]
[46, 47, 48, 49, 50]
...
As you can see, I needed two extra helpers in order to write this new chunks
implementation:
- a
catch
operator, which is simple enough - a
buffer
context manager, which is something much trickier
Also note that this implementation yields a smaller chunk when the production of a single item times out, not when the production of the whole chunk times out.
I'm not sure how I would go about implementing such a feature. Should we add this logic specifically for chunks
or should we take a more generic approach and design new operators to use as building blocks for similar use cases? Sadly I don't really have bandwidth to work on aiostream at the moment, but feel free to add a couple suggestions if you have some.
And thanks for the report :)
from aiostream.
@vxgmichel
The process does not stop until it is manually killed after the chunks operator is used.
async def main():
source=stream.range(0,100)
even=source|pipe.filter(lambda x:x%2==0)
xs=even|chunks.pipe(9,0.1)|pipe.map(lambda x:f'even {x}',task_limit=1)
await (xs|pipe.print())
if __name__ == "__main__":
asyncio.run(main())
from aiostream.
Related Issues (20)
- Incompatibility with mypy 1.7 and later
- Docstring and prototype not properly shown with pylance HOT 1
- New operators
- Rate limit / throttle operator HOT 19
- Task exception was never retrieved warning HOT 3
- Cancelling pending tasks in same TaskGroup causes RuntimeError HOT 3
- UnboundLocalError when trying to reuse a stream HOT 1
- iterating from a synchronous iterator blocks the event loop HOT 6
- add support python3.10 HOT 1
- Please upload a wheel release to pypi HOT 2
- `stream.list` implementation is x10 slower than using plain Python built-in functionality (list comprehension) HOT 2
- action: support task_limit HOT 5
- Regarding `update_pipe_module` HOT 3
- Aiostream fails to import with a TypeError HOT 1
- CI test pipeline doesn't run in Python 3.12: ModuleNotFoundError: No module named 'setuptools' HOT 6
- License change HOT 3
- 0.5.0 (#84) made backwards incompatible changes HOT 6
- asyncio.Event for graceful/early termination HOT 7
- How to handle stream splitting HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from aiostream.