vxgmichel / aiostream Goto Github PK
View Code? Open in Web Editor NEWGenerator-based operators for asynchronous iteration
Home Page: http://aiostream.readthedocs.io
License: GNU General Public License v3.0
Generator-based operators for asynchronous iteration
Home Page: http://aiostream.readthedocs.io
License: GNU General Public License v3.0
This library is very functional and has an emphasis on composition - type annotations with generics would make using it much more robust
hi i can use enumerate() improperly and it works as expected, i.e.:
async for i, r in enumerate(analysis.results):
print(r)
but of course i get the UserWarning: AsyncIteratorContext is iterated outside of its context
but when i try and do it properly:
async with enumerate(analysis.results) as s:
async for i, r in s:
print(r)
i get an AttributeError: __aexit__
on the with
line.
i assume this is a bug (or am i misunderstanding something)?
with thanks
Hi,
Please upload a wheel distribution to pypi. this makes for faster package installations. I'm not a package maintainer myself and thus can't test it but this may be a matter of simply adding 'bdist_wheel' after 'sdist' in the appropriate github action.
Hi, I want to create a batch/buffer of streams, something like Buffer from reactive.
xs = stream.count(interval=0.2)
# Operators can be piped using '|'
ys = xs | pipe.batch(2)
# Use a stream context for proper resource management
async with zs.stream() as streamer:
# Asynchronous iteration
async for z in streamer:
# Print (0,1) , (2,3), ...
print('->', z)`
I'm trying to convert some code using classical generators into code using async features. As far as I understood, aiostream
looks like an helpful package to achieve this.
I came up with a small example mimicking the kind of things I'm trying to do: pairing pre-sorted numbers read from two files, using None
when one stream of numbers has a gap until next matching pair.
Example input:
$ cat f1.txt
2
4
6
7
8
$ cat f2.txt
1
3
4
6
7
8
9
Desired output:
(None, 1)
(2, None)
(None, 3)
(4, 4)
(6, 6)
(7, 7)
(8, 8)
(None, 9)
And here is some code that does that using classical generators:
#!/usr/bin/env python3
import sys
from itertools import repeat
def get_nums_from_file(fname):
with open(fname) as fh:
for line in fh:
yield int(line)
def try_take_from(generator):
try:
return next(generator)
except StopIteration:
return None
def pair_nums(fname1, fname2):
f1_nums = get_nums_from_file(fname1)
f2_nums = get_nums_from_file(fname2)
num1 = try_take_from(f1_nums)
if num1 is None:
yield from zip(repeat(None), f2_nums)
return
num2 = try_take_from(f2_nums)
if num2 is None:
yield from zip(f1_nums, repeat(None))
return
while True:
if num1 > num2:
yield (None, num2)
num2 = try_take_from(f2_nums)
if num2 is None:
yield from zip(f1_nums, repeat(None))
return
elif num1 < num2:
yield (num1, None)
num1 = try_take_from(f1_nums)
if num1 is None:
yield from zip(repeat(None), f2_nums)
return
else:
yield (num1, num2)
num1 = try_take_from(f1_nums)
if num1 is None:
yield from zip(repeat(None), f2_nums)
return
num2 = try_take_from(f2_nums)
if num2 is None:
yield from zip(f1_nums, repeat(None))
return
def main():
for pair in pair_nums("f1.txt", "f2.txt"):
print(pair)
return 0
sys.exit(main())
I managed to get something working using async features and aiostream, but I get warned that AsyncIteratorContext is iterated outside of its context
, and I don't understand where I should change my code to avoid the warning. Here is my code:
#!/usr/bin/env python3
import sys
import asyncio
from itertools import repeat
from aiostream import stream
def get_nums_from_file(fname):
with open(fname) as fh:
for line in fh:
yield int(line)
async def try_take_from(generator):
try:
return await generator[0]
except IndexError:
return None
async def pair_nums(fname1, fname2):
f1_nums = stream.iterate(get_nums_from_file(fname1))
f2_nums = stream.iterate(get_nums_from_file(fname2))
num1 = await try_take_from(f1_nums)
if num1 is None:
async for pair in stream.zip(stream.repeat(None), f2_nums):
yield pair
return
num2 = await try_take_from(f2_nums)
if num2 is None:
async for pair in stream.zip(f1_nums, stream.repeat(None)):
yield pair
return
while True:
if num1 > num2:
yield (None, num2)
num2 = await try_take_from(f2_nums)
if num2 is None:
async for pair in stream.zip(f1_nums, stream.repeat(None)):
yield pair
return
elif num1 < num2:
yield (num1, None)
num1 = await try_take_from(f1_nums)
if num1 is None:
async for pair in stream.zip(stream.repeat(None), f2_nums):
yield pair
return
else:
yield (num1, num2)
num1 = await try_take_from(f1_nums)
if num1 is None:
async for pair in stream.zip(stream.repeat(None), f2_nums):
yield pair
return
num2 = await try_take_from(f2_nums)
if num2 is None:
async for pair in stream.zip(f1_nums, stream.repeat(None)):
yield pair
return
async def main():
async for pair in pair_nums("f1.txt", "f2.txt"):
print(pair)
return 0
sys.exit(asyncio.run(main()))
And here is the output:
(None, 1)
(2, None)
(None, 3)
(4, 4)
(6, 6)
(7, 7)
(8, 8)
/pasteur/homes/bli/.local/lib/python3.7/site-packages/aiostream/aiter_utils.py:120: UserWarning: AsyncIteratorContext is iterated outside of its context
"AsyncIteratorContext is iterated outside of its context")
(None, 9)
I don't exactly understand the meaning of the warning, so I tried two approaches to avoid it.
main
function as follows:async def main():
pair_stream = stream.iterate(pair_nums("f1.txt", "f2.txt"))
async with pair_stream.stream() as pairs:
async for pair in pairs:
print(pair)
return 0
But this doesn't seem to change anything.
pair_nums
function, as follows:async def pair_nums(fname1, fname2):
f1_nums_stream = stream.iterate(get_nums_from_file(fname1))
f2_nums_stream = stream.iterate(get_nums_from_file(fname2))
async with f1_nums_stream.stream() as f1_nums, f2_nums_stream.stream() as f2_nums:
# [same as before, but indented]
But this won't run properly:
(None, 1)
Traceback (most recent call last):
File "./test_pair_nums_async_fix2.py", line 70, in <module>
sys.exit(asyncio.run(main()))
File "/local/gensoft2/exe/Python/3.7.2/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/local/gensoft2/exe/Python/3.7.2/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "./test_pair_nums_async_fix2.py", line 65, in main
async for pair in pair_nums("f1.txt", "f2.txt"):
File "./test_pair_nums_async_fix2.py", line 38, in pair_nums
num2 = await try_take_from(f2_nums)
File "./test_pair_nums_async_fix2.py", line 16, in try_take_from
return await generator[0]
File "/pasteur/homes/bli/.local/lib/python3.7/site-packages/aiostream/core.py", line 29, in wait_stream
async for item in streamer:
File "/pasteur/homes/bli/.local/lib/python3.7/site-packages/aiostream/stream/select.py", line 147, in item
result = await anext(streamer)
File "/pasteur/homes/bli/.local/lib/python3.7/site-packages/aiostream/stream/select.py", line 56, in skip
async for i, item in streamer:
File "/pasteur/homes/bli/.local/lib/python3.7/site-packages/aiostream/stream/transform.py", line 27, in enumerate
async with streamcontext(source) as streamer:
File "/pasteur/homes/bli/.local/lib/python3.7/site-packages/aiostream/aiter_utils.py", line 126, in __aenter__
"AsyncIteratorContext is closed and cannot be iterated")
RuntimeError: AsyncIteratorContext is closed and cannot be iterated
What exactly is expected from me with this warning?
I'm looking for a way to add or remove asynchronous generators from a stream merge operation during iteration. Currently I have something like this, which works great:
getters = (transport.receive() for transport in transports)
async with aiostream.stream.merge(*getters).stream() as streamer:
async for msg in streamer:
process(msg)
I'd like to extend this to support both adding new transports and removing existing ones, from outside this loop. Looking at base_combine
and StreamerManager
, it looks like this could be done by exposing the manager and fiddling with StreamManager.streamers
, though I'm wondering if something like this already exists (the docs suggest not), or if there's perhaps a better way to go about it?
I went over the documentation but couldn't find any operators to turn a "cold" stream to a "hot" one. For example, given the following code:
import asyncio
from aiostream import stream
async def main():
xs = stream.count(interval=1)
async def worker(nr):
async for val in xs:
print(f'w{nr}: {val}')
t1 = asyncio.create_task(worker(1))
await asyncio.sleep(1.5)
t2 = asyncio.create_task(worker(2))
await asyncio.gather(t1, t2)
asyncio.run(main())
Both tasks create a new iterable, resulting in the following output:
w1 0 # t=0
w1 1 # t=1
w2 0 # t=1.5
w1 2 # t=2
w2 1 # t=2.5
w1 3 # t=3
However, I'd like the second task to hook into the same stream as the first one, resulting in the following output:
w1 0 # t=0
w1 1 # t=1
w1 2 # t=2
w2 2 # t=2
w1 3 # t=3
w2 3 # t=3
Does aiostream
provide such facilities (similar to rxjs share operator) and if not, what would be the best approach to implement one?
As a side note, thanks for this library! The documentation as well as the source code look very elegant.
I have an existing project which I'd like to convert to using your library. I've modified one of your examples (listed at the foot of this message) to show the structure I'm aiming for, ie: using async class methods as steps in a transformation pipeline.
This does not work due to the presence of the @operator decorator on class methods, and fails with the message:
An operator cannot be created from a method, since the decorated function becomes an operator class
Is there some alternative way in which I can have an async class methods process objects in the pipeline? I'd prefer keep all of the logic in the pipeline code if I can, and all of the existing logic in the existing classes.
import asyncio
import random as random_module
from aiostream import operator, pipe, streamcontext
class MyOperators:
@operator
async def random(self, offset=0.0, width=1.0, interval=0.1):
"""Generate a stream of random numbers."""
while True:
await asyncio.sleep(interval)
yield offset + width * random_module.random()
@operator(pipable=True)
async def power(self, source, exponent):
"""Raise the elements of an asynchronous sequence to the given power."""
async with streamcontext(source) as streamer:
async for item in streamer:
yield item ** exponent
@operator(pipable=True)
def square(self, source):
"""Square the elements of an asynchronous sequence."""
return self.power.raw(source, 2)
async def main():
xs = (
self.random() # Stream random numbers
| self.square.pipe() # Square the values
| pipe.accumulate()
) # Sum the values
print(await xs)
# Run main coroutine
m = MyOperators()
loop = asyncio.get_event_loop()
loop.run_until_complete(m.main())
loop.close()
I am combining multiple streams with a merge. The output of this stream is a summary "state" of a system. I.e. each item from the stream is a summary of the state of a system at that point in time. The resulting output is fed into a websocket and used to drive a react app. This react app then shows a handy live updating dashboard. This is working really well, but in the more extreme cases i don't really want to send 100's of events a second down the websocket, so i thought a throttle operator would be a handy tool to have in my belt.
The requirement is to start a timer when an item is received from the source, carry on iterating and when that timer expires only send the most recently received item from the source. And the timer should not be running unless there are 1 or more events waiting to send.
My poc looks like this:
@operator(pipable=True)
async def throttle(source, delay=0.5):
async with streamcontext(source) as streamer:
cur = None
next = asyncio.create_task(aiostream.aiter_utils.anext(streamer))
waiter = None
aws = {next}
while True:
done, aws = await asyncio.wait(aws, return_when=asyncio.FIRST_COMPLETED)
if next in done:
cur = await next
next = asyncio.create_task(aiostream.aiter_utils.anext(streamer))
aws.add(next)
if not waiter:
waiter = asyncio.create_task(asyncio.sleep(delay))
aws.add(waiter)
if waiter and waiter in done:
yield cur
waiter = None
It works, but it feels like its relying on asyncio primitives too much, and that maybe there is a more idiomatic way to do it with aiostream. Can you think of any cleaner ways to implement this. The calls to anext()
especially make me feel like i've overlooked something.
I want to create a stream from a simple list, let's say [1, 2, 3]
. How do I do that? What I managed to do is:
@operator
async def out():
for x in [1,2,3]:
yield x
async def main():
async for x in out():
print(x)
If this is the simplest way I would suggest ehancing somehow operator
function to accept a synchronous iterables or creating a dedicated function for that.
Found this library and want to say thanks, it's absolutely brilliant for my usecase of merging multiple AWS Kinesis streams into a single stream of async records.
I noticed there's a chunk method I could pipe the merge into, which works well, until there's a slow down in the stream. Given these streams are unbounded potentially infinite - I'd like to propose a timeout on chunks (or take).
That way you'd be able to take a chunk of size n, or a timeout, whichever happens first and yields that (or skips yielding completely if the buffer is empty).
Thanks again for this library, the merge functionality is absolutely astounding for combining concurrent async generators into one usable stream!
Is it possible to compose different streams without giving them initial value, sort of like function composition, resulting in an async callable ready for input or further composition?
Hello there Vincent !
I've read your answer on stack overflow on asynchronous programming. After following it I get this error :
Traceback (most recent call last):
File "async_update_code.py", line 260, in <module>
loop.run_until_complete(main())
File "/usr/lib/python3.6/asyncio/base_events.py", line 468, in run_until_complete
return future.result()
File "async_update_code.py", line 208, in main
with aiohttp.ClientSession() as session:
File "/home/yahyaa/.local/lib/python3.6/site-packages/aiohttp/client.py", line 818, in __enter__
raise TypeError("Use async with instead")
TypeError: Use async with instead
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f750a1e5080>
BTW, is it alright to not be in a session ?
I also have another question on asyncio/aiostream on stack overflow asked just now. Can you answer it too ?
Thanks.
First of all, I would not expect what I am doing to work. I don't think generators are supposed to work like the examples I am going to give.
I am trying to understand the meaning of 'a stream can be streamed multiple times'. See for instance #40
So what I tried was to actually stream the same generator in parallel, just to test how far 'streaming multiple times' goes.
First asyncio version:
import asyncio
import aiostream.stream as aiostream
async def producer():
i = 1
while True:
yield i
i += 1
await asyncio.sleep(1)
async def consumer1(stream):
async with stream.stream() as streamer:
async for item in streamer:
print(f'Consumer 1 got: {item}')
async def consumer2(stream):
async with stream.stream() as streamer:
async for item in streamer:
print(f'Consumer 2 got: {item}')
async def main():
stream = aiostream.iterate(producer())
await asyncio.gather(
consumer1(stream),
consumer2(stream)
)
asyncio.run(main())
This produces this result:
Consumer 1 got: 1
Consumer 2 got: 2
Consumer 1 got: 3
Consumer 2 got: 4
... continues forever
The trio version, which uses the anyio branch uses trio.sleep()
and the main function looks like this:
async def main():
stream = aiostream.iterate(producer())
async with trio.open_nursery() as nursery:
nursery.start_soon(consumer1, stream)
nursery.start_soon(consumer2, stream)
This produces one result, and then crashes. I have also had it just hang after producing a single result, but I can't reproduce that after the first couple of tries.
As I said in the beginning, I wouldn't expect this to work at all, so I am kind of surprised that asyncio seems to cope.
It would be nice to pin down, what is meant by 'a stream can be streamed multiple times', because the way I see most streams, they are infinite series of items. I attach a stream processer to this infinite series and I don't feel like it would make sense to stream it multiple times, since you would never be able to get to the end of the stream.
Maybe what is meant is this:
?
Can you explain why this is needed? I tried iterating the stream directly and it seems to work, although I get a warning.
Having to get a context before you can iterate the stream seems like a lot of extra work. I am not getting the point of why this extra step is required.
Can you explain what actually happens when you slice a stream?
In your example you point out that if you slice a stream that you used the iterate operator on, you can only get one value out of the stream. I don't understand why this is. It's like slicing the stream modifies the stream itself? That's pretty surprising considering that slicing a list gives you a new list but keeps the original intact.
I am trying to wrap this around in my head.. Is it because once you have awaited the generator wrapped in an iterator, you have closed the generator?
I have a few instances where I need to send the same items to multiple consumers. I have a working implementation for trio.
def broadcast(aiter):
send_channels = []
lock = trio.StrictFIFOLock()
async def listen():
send_channel, receive_channel = trio.open_memory_channel(1)
send_channels.append(send_channel)
try:
while True:
async with lock:
try:
yield receive_channel.receive_nowait()
except trio.WouldBlock:
value = await aiter.__anext__()
for s in send_channels:
if s != send_channel:
s.send_nowait(value)
yield value
finally:
send_channels.remove(send_channel)
return listen
This relies on the lock being fair. Otherwise what could happen is, that if one listener somehow got ahead of the pack, s.send_nowait
could try to send to a full channel, which would cause the listen function to throw a trio.WouldBlock error.
What do you think? Is this of interest for aiostream?
I am creating a several pipelines using aiostream and wanted to stop any specific running pipeline on external event. What could be the cleaner approach to stop the already running pipeline. I cannot close or stop the event loop because the same event loop is shared by all the pipeline tasks.
Seems like some good stuff was merged after 0.3.1 release, i.e. fixed annoying DeprecationWarning
issue. When 0.3.2 will be released?
Is it possible to create a stream from existing tasks?
I.e. a have a bunch of requests from aiohttp
wrapped in Task
s and want to iterate them in order of complete?
Consider this code:
async def main():
async def values(value):
while True:
await asyncio.sleep(0.2)
yield value
async def task():
async for value in merge.raw(values("a")):
print(value)
try:
await asyncio.wait_for(task(), 0.5)
except asyncio.TimeoutError:
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
I get the following output:
a
a
Task was destroyed but it is pending!
task: <Task pending coro=<async_generator_asend()> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f2324a9f168>()]> cb=[_wait.<locals>._on_completion() at /usr/lib64/python3.6/asyncio/tasks.py:380]>
this can be fixed by adding a try
/except
in merge:
streamers = {}
try:
# ...
except asyncio.CancelledError:
for task in streamers:
task.cancel()
raise
It probably happens at other places where asyncio.ensure_future
is used, I'll have a look.
Running values = await stream.list(gen())
timeit: 17,5 seconds
Running values = [_ async for _ in gen()]
timeit: 1.9 seconds
Code:
import asyncio
from collections.abc import AsyncIterator
from aiostream import stream
async def gen() -> AsyncIterator[int]:
for i in range(10_000):
yield i
def main():
async def func():
values = await stream.list(gen())
# values = [_ async for _ in gen()]
asyncio.run(func())
if __name__ == '__main__':
import timeit
print(timeit.timeit("main()", setup="from __main__ import main", number=1000))
What is the reason of the current implementation?
..note:: The same list object is produced at each step in order to avoid memory copies.
Why?
/usr/local/lib/python3.7/site-packages/aiostream/aiter_utils.py:5: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
from collections import AsyncIterator
/usr/local/lib/python3.7/site-packages/aiostream/core.py:5: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
from collections import AsyncIterable, Awaitable
/usr/local/lib/python3.7/site-packages/aiostream/core.py:5: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
from collections import AsyncIterable, Awaitable
Could you please demonstrate an example of using the merge function ?
When using merge and two tasks fail in the same tick a Task exception was never retrieved
warning is generated.
I would expect both or none of case_1
and case_2
to emit the warning.
async def delayed_failing_async_iterator(delay):
await asyncio.sleep(delay)
raise Exception()
yield
async def case_1():
async with aiostream.stream.merge(
delayed_failing_async_iterator(1),
delayed_failing_async_iterator(1),
).stream() as streams:
async for item in streams:
pass
async def case_2():
async with aiostream.stream.merge(
delayed_failing_async_iterator(1),
delayed_failing_async_iterator(2),
).stream() as streams:
async for item in streams:
pass
import asyncio
from aiostream import stream
async def g1():
for i in range(10):
yield 2 * i
await asyncio.sleep(.3)
async def g2(n):
yield n
await asyncio.sleep(.1)
yield n + 1
async def main():
# Create stream of numbers
xs = stream.iterate(g1())
# Map each number to a new stream
ys = stream.map(xs, lambda n: stream.iterate(g2(n)))
# Flatten a stream of streams into a stream of numbers
zs = stream.flatten(ys)
# Or just
# zs = stream.flat_map(xs, g2) # automatically wrapped into stream.iterate
await stream.list(zs)
# Expecting [0, 1, 2, 3, 4, ..., 19]
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
flat_map
is quite a handy thing in reactive world. Is it possible to implement it in aiostream
?
It should work like a stream.merge
, but merge
expects all streams given as arguments prior to call, but in the example above streams also arrive asynchronly.
P.S. Also, switch_map
should be easy implement after flat_map
.
Here are the list of the combinators using asyncio.iscoroutinefunction
to automatically detect whether the provided callback should be awaited or not:
stream.accumulate
stream.map
stream.starmap
stream.call
stream.action
stream.filter
stream.until
stream.dropwhile
stream.takewhile
The problem with this detection is that it is hard to avoid false negatives.
However there are two counter-arguments to that:
functools.partial
has been fixed in python 3.8:
assert asyncio.iscoroutinefunction(partial(asyncio.sleep, 3))
ys = stream.map(xs, async_(lambda x: asyncio.sleep(x / 1000))
I can see a few solutions here:
filter_sync
, filter_async
). Problem: I don't like the idea of duplicating all those operatorsasync_
to fix the warning. Problem: does not fully address the initial issue.Are there other alternative?
I am looking to preserve an aiohttp
streaming response through their async iterator.
The idea is to stream a response to check for a specific criteria that will appear somewhere in the first 5-20 lines of the response and if it is not matched we would close the stream, and if it is matched we would pipe it into a blob storage service.
However I cannot seem to preserve the stream. I have tried this a few way but I cannot seem to get it right.
At first I figured that this it would be something as simple as this:
async def main():
async with aiohttp.ClientSession() as sesh:
async with sesh.get("https://jsonplaceholder.typicode.com/todos/1") as resp:
async with stream.preserve(resp.content).stream() as streamer:
# This loop runs
async for line in streamer:
print(line)
await asyncio.sleep(1)
# This loop doesn't
async for line in resp.content:
print(line)
await asyncio.sleep(1)
As the comments say, only one loop runs.
I also tride wrapping the stream in an aiostream.stream.operator
but it gave me an exception:
content = operator(resp.content)
Traceback (most recent call last):
File "test_preserve_http.py", line 29, in <module>
loop.run_until_complete(main())
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
return future.result()
File "test_preserve_http.py", line 10, in main
content = operator(resp.content)
File "/Users/tiptop96/concepts/.env/lib/python3.6/site-packages/aiostream/core.py", line 368, in operator
return decorator if func is None else decorator(func)
File "/Users/tiptop96/concepts/.env/lib/python3.6/site-packages/aiostream/core.py", line 273, in decorator
name = func.__name__
As a side note, I also receive a warning as follows:
If I remove the stream()
call it also raises an exception.
Any guidance on how to do this or if it is even possible is much appreciated. ๐
import asyncio
import aiostream
import random
async def hack_to_wait(task):
await task
yield None
async def wait_for_tasks(tasks)
async with aiostream.stream.flatmap(aiostream.stream.iterate(tasks), hack_to_wait).stream() as streamer:
async for _ in streamer:
pass
# A silly example
async def produce_tasks():
for _ in range(5):
await asyncio.sleep(random.randint(1, 10))
yield asyncio.sleep(random.randint(1, 10))
loop = asyncio.get_event_loop()
loop.run_until_complete(wait_for_tasks(produce_tasks()))
loop.close()
Playing with delays and debugging prints shows that this code does what is intended: it waits for tasks already obtained from stream_of_tasks
to complete and for more tasks to be generated by stream_of_tasks
at the same time. Basically, it's asyncio.wait()
that supports async collections โ or rather, some ugly hack. It doesn't play well with exceptions and can only wait for the completion of all tasks at once. Still, it's somewhat useful for tasks that has to be completed, but don't return a result and have no reasonable place to be awaited for.
Is there a better way to do this with aiostream
, or maybe the problem it tries to solve (awaiting all tasks from an awaitable collection) can be evaded entirely?
would it be possible to add aislice
and aenumerate
functions that match the interface of the synchronous versions? https://stackoverflow.com/questions/42378566/python-3-6-async-version-of-islice
while I definitely want to look into doing stream processing things with asyncio + aiostream, I have usecases where I just need to mimic the synchronous interface and its easier to use the API I'm already familiar with. thanks!
example usecase:
def test_sync():
gen = craigslist.search('washingtondc', 'apa', postal=20071, search_distance=1)
for post in islice(gen, 0, 110):
pass
@pytest.mark.asyncio(forbid_global_loop=False)
async def test_async():
gen = craigslist.search_async('washingtondc', 'apa', postal=20071, search_distance=1)
async for post in aislice(gen, 0, 110):
pass
I'm not sure if this is indented behavior but if you create a stream from a synchronous iterator that does some blocking operation, then the event loop ends up being blocked. I was hoping aiostream would do some magic to put the synchronous iterator on a separate thread but afaict that's not happening? Is that something that you'd be interested in a contribution for?
import aiostream
import asyncio
import time
def sync_iterator():
for i in range(10):
time.sleep(1.0)
yield i
async def async_iterator():
for i in range(10):
await asyncio.sleep(1.0)
yield i
async def print_stuff():
for i in range(100):
print('printing', i)
await asyncio.sleep(0.1)
async def run():
asyncio.create_task(print_stuff())
async with aiostream.stream.iterate(sync_iterator()).stream() as streamer:
async for item in streamer:
print(item)
asyncio.run(run())
The print_stuff
loop gets blocked (it's not printing every 100ms) with the sync_iterator
, but not with the async_iterator
Feel free to close this if this is intended behavior!
#Hey,
Thanks for this awesome library! I was wondering if it would be a good idea to add a concurrent()
method to operators?
i.e:
urls = ['google.com', 'reddit.com', 'github.com']
@operator(pipable=True)
def fetch(source):
async with streamcontext(source) as streamer:
async for i, item in streamer:
yield fetch_url(item)
stream = iterate(urls) | fetch().concurrent(2) | parse()
This stream would cause the fetch
operator to run two concurrent tasks, allowing two concurrent HTTP requests to be made (through the imaginary fetch_url
method). Of course this would cause any downstream operators to receive items out of order, but in some cases this is OK.
It looks like there are a few hard coded dependencies on the asyncio event loop, like for example with the time operators.
I am not sure if there is any way to introspect which loop you are running under and whether you could abstract away event loop apis. Alternatively if you wanted to run aiostream on those loops there would need to be loop specific versions of those operators.
What I really wanted to ask in previous issue was something different :D
Let's say I have a list of tasks/futures and I want to create a stream of values returned by those tasks. What I managed to do is:
async def value(x):
await asyncio.sleep(random.randint(1, 10) / 10)
return x
async def main():
tasks = stream.iterate([stream.just(asyncio.ensure_future(value(x))) for x in range(10)])
async for x in stream.flatten(tasks):
print(x)
asyncio.get_event_loop().run_until_complete(main())
Looks over-complicated and I wonder if the same can be achieved in a shorter form.
BTW. I'm so excited this library exists! Thank you!
On Python 3.8 this code throws `RuntimeError: athrow(): asynchronous generator is already running:
import asyncio
from aiostream.stream import merge
async def emit_events():
while True:
await asyncio.sleep(0.5)
yield 'event'
async def run_stuff():
async with merge(emit_events(), emit_events()).stream() as s:
async for event in s:
print(event)
async def main():
await asyncio.wait_for(run_stuff(),timeout=3)
if __name__ == '__main__':
asyncio.run(main())
Full stacktrace:
Traceback (most recent call last):
File "kek.py", line 25, in <module>
asyncio.run(main())
File "/usr/lib/python3.8/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
return future.result()
File "kek.py", line 20, in main
await asyncio.wait_for(run_stuff(),timeout=3)
File "/usr/lib/python3.8/asyncio/tasks.py", line 490, in wait_for
raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<run_stuff() done, defined at kek.py:14> exception=RuntimeError('athrow(): asynchronous generator is already running')>
Traceback (most recent call last):
File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/stream/advanced.py", line 55, in base_combine
streamer, task = await manager.wait_single_event(filters)
File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/manager.py", line 78, in wait_single_event
done = await self.group.wait_any(tasks)
File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/manager.py", line 24, in wait_any
done, _ = await asyncio.wait(tasks, return_when="FIRST_COMPLETED")
File "/usr/lib/python3.8/asyncio/tasks.py", line 426, in wait
return await _wait(fs, timeout, return_when, loop)
File "/usr/lib/python3.8/asyncio/tasks.py", line 523, in _wait
await waiter
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.8/contextlib.py", line 662, in __aexit__
cb_suppress = await cb(*exc_details)
File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/aiter_utils.py", line 154, in __aexit__
await self._aiterator.athrow(typ, value, traceback)
RuntimeError: athrow(): asynchronous generator is already running
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "kek.py", line 16, in run_stuff
async for event in s:
File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/stream/advanced.py", line 96, in base_combine
manager.create_task(streamer)
File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/manager.py", line 62, in __aexit__
return await self.stack.__aexit__(*args)
File "/usr/lib/python3.8/contextlib.py", line 679, in __aexit__
raise exc_details[1]
File "/usr/lib/python3.8/contextlib.py", line 662, in __aexit__
cb_suppress = await cb(*exc_details)
File "/home/o2genum/Desktop/venv/lib/python3.8/site-packages/aiostream/aiter_utils.py", line 154, in __aexit__
await self._aiterator.athrow(typ, value, traceback)
RuntimeError: athrow(): asynchronous generator is already running
If there are still pending tasks in a TaskGroup
when its __aexit__
is called the self._pending
set is modified during iteration.
import asyncio
import aiostream
async def failing():
try:
await asyncio.sleep(0.1)
yield
raise Exception()
except asyncio.CancelledError:
pass
async def working():
for _ in range(4):
await asyncio.sleep(0.1)
yield
async def main():
async with aiostream.stream.merge(*([failing() for _ in range(100)] + [working() in range(100)])).stream() as streams:
async for _ in streams:
pass
pass
asyncio.run(main())
Causes:
Traceback (most recent call last):
...
File "/home/a/.virtualenvs/dq/lib/python3.8/site-packages/aiostream/manager.py", line 20, in __aexit__
for task in self._pending:
RuntimeError: Set changed size during iteration
Process finished with exit code 1
import asyncio
import aiostream
async def counter():
s = aiostream.stream.count(interval=0)
async with s.stream() as stream:
async for event in s:
print(event)
async def foo():
while True:
print('foo')
await asyncio.sleep(0.01)
loop = asyncio.get_event_loop()
fut = asyncio.wait([counter(), foo()])
loop.run_until_complete(fut)
Running the above code, counter
seems to block the event loop indefinitely and foo
never gets called:
$ python aiostream_issue.py > aiostream_issue.log
$ cat aiostreamer_bug.log | grep foo | wc
0 0 0
However, I found a couple of workarounds:
interval
positives
with pipe.spaceout(interval=0)
await asyncio.sleep(0)
after print(event)
Applying any of the above workarounds and running again for a few seconds:
$ python aiostream_issue.py > aiostream_issue.log
$ cat aiostreamer_bug.log | grep foo | wc
425 425 1700
I have tried replacing asyncio.wait
with asyncio.gather
, using uvloop
, and other minor changes, but the results were the same.
I thought that a Stream
object would implicitly yield the control flow to the the event loop after each iteration (as is the case in workaround 2), but that seems to not always be the case. Is this by design or a perhaps a bug?
PS: Thanks a lot for this great library!
I tried to use aiostream.core.operator
on a class method:
from aiostream.core import operator
class test(object):
@operator
async def testit(self):
pass
but it fails with error:
$ python3.7 test.py
Traceback (most recent call last):
File "test.py", line 3, in <module>
class test(object):
File "test.py", line 4, in test
@operator
File "/home/jlana/Dropbox/vyuka2/venv/lib/python3.7/site-packages/aiostream/core.py", line 333, in operator
return decorator if func is None else decorator(func)
File "/home/jlana/Dropbox/vyuka2/venv/lib/python3.7/site-packages/aiostream/core.py", line 279, in decorator
init.__signature__ = signature.replace(parameters=new_parameters)
File "/usr/lib/python3.7/inspect.py", line 2843, in replace
return_annotation=return_annotation)
File "/usr/lib/python3.7/inspect.py", line 2788, in __init__
raise ValueError(msg)
ValueError: duplicate parameter name: 'self'
As workaround it is possible to use different name and call the operator with the class instance:
class test(object):
n = 123
@operator
async def testit(myself):
yield myself.n
async def main():
t = test()
await stream.print(t.testit(t))
Just wanted to point out, that while this is a very cool trick, it isn't picked up by an IDE like Pycharm for example.
https://github.com/vxgmichel/aiostream/blob/main/aiostream/pipe.py#L9
This leads to the Cannot find reference 'filter' in 'pipe.py
syntax warning highlights.
Maybe there is an alternative?
I'm trying to send a stream to multiple processors in series and am getting the following error when the second processor receives the stream:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/aiostream-0.4.3-py3.8.egg/aiostream/core.py", line 35, in wait_stream
UnboundLocalError: local variable 'item' referenced before assignment
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "project/__main__.py", line 50, in <module>
compare.run(sources)
File "project/project/simulation/__init__.py", line 31, in run
comparison.run(sources)
File "project/project/bot.py", line 24, in run
asyncio.run(loop())
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "project/project/bot.py", line 22, in loop
await self._dispatcher.dispatch(sources)
File "project/project/core/dispatch.py", line 20, in dispatch
await stream.map(funnel, self._route_data)
File "/usr/local/lib/python3.8/dist-packages/aiostream-0.4.3-py3.8.egg/aiostream/core.py", line 37, in wait_stream
aiostream.core.StreamEmpty
The project isn't open source at the moment so I can't refer you to the code that produces this error but I will try to create a simplified version to reproduce it.
hi!
when are you planned to add python3.10 support?
|
syntaxFor the reference, here's how a long pipeline looks once blackified:
xs = (
stream.count(interval=0.1)
| pipe.skip(10)
| pipe.take(5)
| pipe.filter(lambda x: x % 2)
| pipe.map(lambda x: x ** 2)
| pipe.accumulate()
)
The aiostream pipe syntax is often (and legitimately) seen as magical. Conceptually though, it's quite simple: the pipe combinators (in aiostream.pipe.*
) are just curried and flipped version of the standard combinators (in aiostream.stream.*
). For instance:
ys = stream.map(xs, lambda x: x ** 2)
# is equivalent to
ys = pipe.map(lambda x: x ** 2)(xs)
Moreover, the pipe syntax is simply defined as regular function composition:
ys = xs | f | g | h
# is equivalent to
ys = h(g(f(xs)))
Combining those two ideas, we get:
ys = stream.map(xs, lambda x: x ** 2)
# is equivalent to
ys = xs | pipe.map(lambda x: x ** 2)
That's neat but we can't really expect to convince every one that uses aiostream that "it's not magical, it's simply curried and flipped combinators with syntactic sugar for function composition".
Another issue is that the pipe operator precedence does not play well with the await statement:
await stream.range(3) | pipe.print()
# is equivalent to
(await steam.range(3)) | pipe.print()
# which produces a type error
toolz
wayFor the reference, here's how a long pipeline would look once blackified:
xs = pipe(
stream.count(interval=0.1),
pipe.skip(10),
pipe.take(5),
pipe.filter(lambda x: x % 2),
pipe.map(lambda x: x ** 2),
pipe.accumulate(),
)
Pros:
Cons:
pipe
namespace for flipped combinatorspipe
function has to be added, and the name conflicts with the pipe
namespaceFor the reference, here's how a long pipeline would look once blackified (the parentheses have to be added explicitly though):
xs = (
stream.count(interval=0.1)
.skip(10)
.take(5)
.filter(lambda x: x % 2)
.map(lambda x: x ** 2)
.accumulate()
)
Pros:
pipe
scope for flipped combinators, methods already do thatCons:
I'm getting an unexpected error when I run a program with the following structure:
import asyncio
from aiostream import stream
async def foo():
inbox = stream.iterate([1, 2, 3, 4])
inbox2 = stream.iterate([4, 5, 6, 7])
merged = stream.merge(inbox, inbox2)
async with merged.stream() as s:
async for message in s:
if message > 5:
return
async def main():
await foo()
asyncio.run(main())
The error is as follows:
$ python tester.py
Traceback (most recent call last):
File "/home/sam/projects/aiostream_error_example/venv/lib/python3.7/site-packages/aiostream/stream/advanced.py", line 93, in base_combine
yield result
GeneratorExit
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/sam/projects/aiostream_error_example/venv/lib/python3.7/site-packages/aiostream/stream/advanced.py", line 96, in base_combine
manager.create_task(streamer)
File "/home/sam/projects/aiostream_error_example/venv/lib/python3.7/site-packages/aiostream/manager.py", line 62, in __aexit__
return await self.stack.__aexit__(*args)
File "/usr/local/lib/python3.7/contextlib.py", line 652, in __aexit__
raise exc_details[1]
File "/usr/local/lib/python3.7/contextlib.py", line 635, in __aexit__
cb_suppress = await cb(*exc_details)
File "/usr/local/lib/python3.7/contextlib.py", line 545, in _exit_wrapper
await callback(*args, **kwds)
File "/home/sam/projects/aiostream_error_example/venv/lib/python3.7/site-packages/aiostream/manager.py", line 36, in cancel_task
await task
StopAsyncIteration
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "tester.py", line 20, in <module>
asyncio.run(main())
File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "tester.py", line 18, in main
await foo()
File "tester.py", line 15, in foo
return
File "/home/sam/projects/aiostream_error_example/venv/lib/python3.7/site-packages/aiostream/aiter_utils.py", line 163, in __aexit__
await self._aiterator.aclose()
RuntimeError: async generator raised StopAsyncIteration
This error occurs in version 0.3.2
but not in version 0.3.1
.
Any help much appreciated.
Sorry if I've missed this looking through the documentation. But I was wondering if it's possible to append to a aiostream.stream.merge
during iteration (or some other equivalent logic).
Here's some pseudo-ish code of the logic I'd like to be able to achieve:
from asyncio import sleep, run
from aiostream.stream import merge
async def go():
yield 0
await sleep(1)
yield 50
await sleep(1)
yield 100
async def main():
tasks = merge(go(), go(), go())
async for v in tasks:
if v == 50:
tasks.merge(go())
print(v)
if __name__ == '__main__':
run(main())
i.e. the async for v in tasks
should never complete but continue to add new generators into the stream. Hope that makes some sense.
Thanks for the work on these tools, by the way.
Hi,
Great library. I had a use-case where the groupby
operator from itertools
would come in handy, but I can't find a similar operator here. Is there a way to achieve similar functionality using the existing operators?
Hello! I'm new to aiostream (but am already very impressed and excited to use it).
I'm trying to convert my code from the async with ... async for
syntax to the more concise pipe
syntax.
The code I'm writing this for is a development web server that watches a few directories for filesystem changes (modify, add, delete) and then, will regenerate the site when a change occurs.
The whole reason I'm using aiostream is because my interface to watch the directories is asynchronous, but does not support watching multiple paths. Therefore, I need to combine (merge
in aiostream parlance?) a list of watchers and iterate over their change events.
Change events from the watchers can obviously happen at any time. Also, the watchers will watch the directories indefinitely (I've been reading around about properly closing the StreamContext on completion, but as of now, the only way out for this code is a Ctrl-C from the user).
Here's an excerpt of what I'm doing. The working async with ... async for
is in the watch
method, commented:
async def watch(app: web.Application) -> None:
app.logger.info(f"Watching {', '.join(map(str, app["generator"].watch_paths))} for changes")
# THIS is the code that works! Minus the @operator decoration on regenerate
# async with stream.merge(*[awatch(p) for p in watch_paths]).stream() as s:
# async for changedset in s:
# await regenerate(changedset, app)
# This is the code that I'm trying to get work, but am getting no feedback as to the problem
await (stream.merge(*[awatch(p) for p in watch_paths]) | regenerate(app))
@core.operator(pipable=True)
async def regenerate(changedset: Set[Tuple[Change, str]], app: web.Application) -> None:
changed_files = ", ".join(path for change_type, path in changedset)
app.logger.info(
f"Regenerating site because the following files have changed: {changed_files}"
)
generated = await generate(app, False)
In the above implementation, regenerate
is never entered for some reason and aiostream library is giving me no indication as to why. This is at least bug on my part, but optimistically, an example of where logging may be improved. Thank you!
A StackOverflow question pointed out that there is no async equivalent to the two-argument iter
. Although in the OP's particular case the issue was easily resolved through the use of the walrus operator, it occurred to me that a function that creates a stream from a function might actually be useful in many situations, such as for iterating over a queue or other situations where the data comes from a possibly-async function.
An operator that converts a callable into a stream would nicely fit among aiostream's creational operators and would allow combining such a source with other aiostream operators. Assuming the name from_func
(in analogy to from_iterable
), one could iterate over an asyncio queue with:
async for item in from_func(queue.get):
...
or from binary chunks of a file:
async for chunk in from_func(fileobj.read, 4096, sentinel=b''):
...
The implementation would be something like:
_no_sentinel = object()
@operator
def from_func(func, *args, sentinel=_no_sentinel):
"""Generate values by awaiting func(*args).
Await the result of the function if the function is asynchronous.
If sentinel is given, terminate when the function returns the sentinel.
"""
is_async = asyncio.iscoroutinefunction(func)
while True:
item = func(*args)
if is_async:
item = await item
if item is sentinel:
break
yield item
Is there any reason for not supporting the task_limit
parameter on the action
operator? The underlying implementation map
allows it, so it would simply be a matter of forwarding the parameter.
Just thought I'd share some ideas for streams that would be useful to me.
A ziplast stream. It would zip a number of async sequences, but yield a tuple for every item recieved on every stream with each element being the last element recieved on each stream. If no elements have yet been recieved on a stream, the value could be None on the corresponding tuple position. I am really often running into having to coordinate two sequences that yield values at different intervals, one every other second and another every ten seconds, for example.
Merge can kind of achieve this, if you are able tell each element apart, for example by tagging them somehow. If each item has different type, then it is doable, but if each item is a dict for example, then it becomes not so easy to tell where each item came from, unless you wrap it with some metadata.
Also a ratelimit stream. It yields a value at most every n seconds. The rest of the items are discarded.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.