Code Monkey home page Code Monkey logo

Comments (53)

vxgmichel avatar vxgmichel commented on August 12, 2024 2

PR #58 (trio and curio support) has been merged on the v1 branch. However, version 1 is not out yet since there are a few things I want to change before releasing it (see PR #60).

Thanks everyone for sharing your thoughts, I'm happy to finally close this long-standing issue :)

from aiostream.

agronholm avatar agronholm commented on August 12, 2024 1

Btw, regarding memory channels, I've already implemented them for AnyIO in the v2 branch. I call them "memory message streams" though.

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

Hi @andersea, thanks for creating this issue.

I'd like very much to support curio and trio, especially because a large part of the library is asyncio agnostic (the core modules don't even import asyncio for instance). I've listed below all the occurrences of asyncio functions within the lib:

Differentiate between sync and async functions:

  • asyncio.iscoroutinefunction -> can be replaced by inspect.iscoroutinefunction

Block forever:

  • asyncio.Future -> trio has trio.sleep_forever()

Get a time reference:

  • asyncio.get_event_loop().time() -> trio has trio.current_time()

Set timeout:

  • asyncio.wait_for -> trio has trio.fail_after

Sleep:

  • asyncio.sleep -> trio has trio.sleep

Yield control to the event loop:

  • asyncio.sleep(0) -> I guess trio.sleep(0) shows the same behavior

Test utils:

  • asyncio.test_utils -> it's probably not so urgent to generalize the test utils

Manage multiple streams (advanced and zip operator):

  • asyncio.ensure_future
  • asyncio.wait
  • asyncio.gather -> tricky stuff

As you've pointed out, there's also the question of detecting which loop is being used:

  • asyncio: asyncio.get_event_loop().is_running()
  • trio: ??
  • curio: ??

Related issue on trio tracker.

from aiostream.

andersea avatar andersea commented on August 12, 2024

Nice summary. Some of these are easy. Would be a good place for a first time contributer to start submitting a patch I think. ;)

from aiostream.

andersea avatar andersea commented on August 12, 2024

Interesting.

asyncio.iscoroutinefunction is not compatible with inspect.iscoroutinefunction

In python 3.6 asyncio, asyncio.sleep is defined like this:

@coroutine
def sleep(delay, result=None, *, loop=None):

asyncio.iscoroutinefunction thinks this is a coroutine. The inspect version does not.

This means the following test fails:

# Asynchronous/simple/concurrent
with event_loop.assert_cleanup():
    print('ASYNC/SIMPLE/CONCURRENT')
    xs = stream.range(1, 4) | pipe.map(asyncio.sleep)
    expected = [None] * 3
    await assert_run(xs, expected)
    assert event_loop.steps == [1, 1, 1]

because if you replace the map operator check with the inspect version

if inspect.iscoroutinefunction(func):

then the wrong branch is taken.

I believe this is fixed in python 3.7 since all of asyncio now use async def syntax.

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

Would be a good place for a first time contributer to start submitting a patch I think. ;)

Great thanks, that's much appreciated :)

asyncio.iscoroutinefunction is not compatible with inspect.iscoroutinefunction

Oh, right. Well I guess asyncio.iscoroutinefunction should also work fine for curio and trio anyway, right?

from aiostream.

andersea avatar andersea commented on August 12, 2024

Yes that is what I was thinking as well. There is no way to reimplement it, since it uses a magic singleton to mark coroutines as such.

from aiostream.

andersea avatar andersea commented on August 12, 2024

You could do something like this:

def iscoroutinefunction(func):
    """Return True if func is a decorated coroutine function."""
    return (inspect.iscoroutinefunction(func) or
            isinstance(getattr(func, '_is_coroutine', None), object))

Although this provides a slightly weakened guarantee, since it doesn't compare to the actual magic singleton from asyncio.

Edit: Hmm.. this would actually just check if the attribute existed, right? since all python objects descend from object? So it isn't really desireable.

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

Although this provides a slightly weakened guarantee, since it doesn't compare to the actual magic singleton from asyncio.

Honestly I wouldn't bother. I think we should simply gather every compatibility function in a dedicated module and use:

iscoroutinefunction = asyncio.iscoroutinefunction 

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

Hi @andersea,

Out of curiosity, do you know how to detect at runtime whether the current context corresponds to a curio or a trio application?

from aiostream.

andersea avatar andersea commented on August 12, 2024

I have been thinking about this. It looks like
trio.hazmat.current_trio_token may identify if a trio loop is running, but I think asking on trio gitter may be a good idea.

One thing that might be worth considering, is whether it makes sense to explicitly select the event loop instead of autodetecting it, maybe during import. import aiostream.trio?

from aiostream.

andersea avatar andersea commented on August 12, 2024

This will throw a RuntimeError when run under asyncio:

import asyncio
import trio

async def test():
    token = trio.hazmat.current_trio_token()
    print(token)
    
trio.run(test)
asyncio.get_event_loop().run_until_complete(test())

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

It looks like trio.hazmat.current_trio_token may identify if a trio loop is running, but I think asking on trio gitter may be a good idea.

Sounds good!

One thing that might be worth considering, is whether it makes sense to explicitly select the event loop instead of autodetecting it, maybe during import. import aiostream.trio?

I can think of three different approach:

  • detect at runtime
    • pros: transparent interface
    • cons: performing the checks at every call may affect the performance
  • separate functions in separate modules
    • pros: separate implementations
    • cons: lot of duplication
  • have a global switch (e.g. aiostream.set_mode)
    • pros: explicit
    • cons: globals are anoying

So far I prefer the runtime detection approach. Also, PEP 567 might be used to limit the checks to one per task, AFAIU. What do you think?

from aiostream.

andersea avatar andersea commented on August 12, 2024

The theelous3/multio event loop abstraction uses the third option, but it feels a bit clunky.

As long as the shim doesn't start to get crazy complicated, I like detecting at runtime. If it looks like it will become really convoluted, then explicit selection seems fine to me. I don't think it will be too bad in terms of code duplication.

Here is a naive loop check function. This assumes all event loop libraries are available, which would not be the case usually. Normally you can import either Trio or Curio but both will likely not be available.

import asyncio
import curio
import trio

async def which_event_loop():
    try:
        trio.hazmat.current_task()
        print('Running in the Trio event loop.')
    except RuntimeError:
        try:
            await curio.current_task()
            print('Running in the Curio event loop.')
        except RuntimeError:
            print('Running in asyncio.')
    
trio.run(which_event_loop)
curio.run(which_event_loop)
asyncio.get_event_loop().run_until_complete(which_event_loop())
Running in the Trio event loop.
Running in the Curio event loop.
Running in asyncio.

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

Looks good! I like the idea of making this function a coroutine so there is no way to use it outside an asynchronous context.

What about something like this:

import asyncio

try:
    import curio
except ImportError:
    curio = None

try:
    import trio
except ImportError:
    trio = None


async def get_async_module():
    try:
        if trio and trio.hazmat.current_task():
            return trio
    except RuntimeError:
        pass
    try:
        if curio and (await curio.current_task()):
            return curio
    except RuntimeError:
        pass
    try:
        if asyncio.get_event_loop().is_running():
            return asyncio
    except RuntimeError:
        pass
    raise RuntimeError(
        'The currrent asynchronous framework is not supported')


async def sleep(seconds):
    module = await get_async_module()
    return await module.sleep(seconds)


# Testing

def test_trio():
    trio.run(sleep, 1)


def test_curio():
    curio.run(sleep(1))


def test_asyncio():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(sleep(1))

from aiostream.

andersea avatar andersea commented on August 12, 2024

That could do it. Unless there is some sync way to detect Curio, then it will have to be async. You could cache the result of get_async_module, if you wanted to.

from aiostream.

njsmith avatar njsmith commented on August 12, 2024

The existence of trio-asyncio makes this kind of detection more complicated – it lets trio and asyncio tasks cohabitate in the same thread. In particular, right now at least, this means synchronous trio functions and synchronous asyncio functions can both succeed, but only one flavor of async function can work in any given context. Twisted's asyncio compatibility layer is similar.

I feel like it would be good to put this "which loop is it?" functionality in a dedicated package, and get the different async libraries involved in maintaining it.

Do you need the detection to work from sync context, or is async good enough?

For async I'm imagining a mechanism we could use where we yield a special sentinel value, and update the various loops to detect this value and send back their name. And to be clever, we can pick the value so that asyncio does that without having to be updated...

For sync, it's a bit trickier, but we could possibly do something like, have trio-asyncio and twisted keep a contextvar updated recording which mode they're in at any given moment?

This seems like a question that multio cares about too so CC @theelous3 @Fuyukai

from aiostream.

njsmith avatar njsmith commented on August 12, 2024

Here's what the async version of this might look like: https://gist.github.com/njsmith/a9f9d57b5665d0b49ae818e30a0a2863

The idea would be that this gives the right result for asyncio, and then libraries like trio would add some code where they handle incoming yields:

if yielded_value is sniffio.LIBRARY_QUERY_SENTINEL:
    # send back the value "trio"
else:
   ... regular yield handling ...

from aiostream.

smurfix avatar smurfix commented on August 12, 2024

trio_asynciowill go with the contextvar.
The idea is for trio-asyncio to export a "state" function that tells you whether a trio-asyncio loop is running and, if so, whether your current code is running under it. Then any2trio|asyncio / trio|asyncio2any wrappers (you need the latter for async callbacks) can examine the current state and do their thing if necessary.

@njsmith We'd still need special-case handling for trio_asyncio because it doesn't intercept your yield no matter which context you're in – so that would still return "trio".

I don't think you'd need this detection in sync context.

from aiostream.

njsmith avatar njsmith commented on August 12, 2024

@smurfix the yield here is really an await, and trio-asyncio certainly has to intercept those or else await future wouldn't work :-)

It does seem like a synchronous function would be better though, and yeah that's probably most cleanly done as a contextvar. Maybe we should have a sniffio package whose only job is to define the ContextVar that all the other packages can import and set/check?

from aiostream.

andersea avatar andersea commented on August 12, 2024

The reason why aiostream needs this is that it has operators that space out events to happen at regular intervals or delay a stream a certain amount of time. These operators need the sleep and timeout functionality of the current event loop.

This is only needed in async world as far as I know. I haven't looked at all the cases.

@njsmith Can you explain a bit more how your example works? I am not getting the idea of it I think.

from aiostream.

njsmith avatar njsmith commented on August 12, 2024

Can you explain a bit more how your example works?

Do you mean the gist I posted, or my last comment about having a shared ContextVar?

from aiostream.

andersea avatar andersea commented on August 12, 2024

The gist.

I want to say both. :) But I haven't read PEP 567, and I am not sure I understand why ContextVars is a good thing. (Well I read some of it which almost blew a fuse in my brain.) I guess I am coming at this from very much a user perspective and some of this just gets a bit technical.

from aiostream.

njsmith avatar njsmith commented on August 12, 2024

The gist is very much not aimed at users :-). I mean, you're totally welcome to dig into it if you want, but the end user experience is supposed to be that you call await sniffio.current_async_library() and it returns "asyncio" or "trio" or whatever. How does it actually do that? There are two ideas: (1) all async libraries use await to send messages between their I/O primitives and their task scheduler. So we define a special message that you can send any async library, that is intended to mean "hello, who are you?". Then we go around to trio, curio, twisted, etc. and patch them all to recognize this message and respond appropriately. (2) However, we might not convince asyncio to implement this, and certainly won't convince them to implement this in already-released versions of python, so that gist has a "clever" hack where the magic message just happens to be a quirky object where if asyncio sees it, it immediately sends back the value None. Which we can detect, and report as being asyncio.

ContextVars are much simpler: the idea is that you have a special variable, that's kind of like a global -- you can set it, and then read it later -- but whatever value I set within one task, is only visible within that task (and its children). So it's like a global except that different tasks/threads/etc. each get their own copy, so they don't accidentally overwrite each other. And then the idea would be that each library would, in trio.run or asyncio.run or whatever, set this value to "trio" or "asyncio" or whatever makes sense for them, and then code could look at the value to see which library they were using.

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

I like the contextvars approach, I was actually thinking of using it to cache the result of the loop detection.

@njsmith

And then the idea would be that each library would, in trio.run or asyncio.run or whatever, set this value to "trio" or "asyncio" or whatever makes sense for them, and then code could look at the value to see which library they were using.

If this context var lives in the sniffio module, I don't see how asyncio.run would set the value since it can't depend on a third party library. Am I missing something?

from aiostream.

njsmith avatar njsmith commented on August 12, 2024

Yeah, I was simplifying... ideally the stdlib would host this variable, and would have been hosting it since the beginning. But of course it didn't, and now it won't until Python 3.8 at the earliest. (And I'm not entirely sure they'd go for it anyway, since Guido's official position has always been that asyncio is what you should be using.) So in reality we'll need to host it in a third party module, and then somehow special-case asyncio.

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

Right. Then what about going with the hybrid approach of keeping the detection logic and caching the result in a context var? Asynchronous frameworks can then decide (or not) to ease the detection by setting the context var earlier. For instance:

async_module = contextvars.ContextVar('async_module', default=None)

async def get_async_module():
    if async_module.get() is None:
        async_module.set(await _get_async_module())
    return async_module.get()

async def _get_async_module():
    ... # Hackish loop detection 

Note that this doesn't work at the moment since curio doesn't seem to create a new context for its tasks.

from aiostream.

njsmith avatar njsmith commented on August 12, 2024

It's actually not clear to me whether there is any working detection logic though? (See the discussion about about issues with trio vs. trio-asyncio and twisted vs. asyncio, and also curio's requirement to use await.) But if we want to go ahead with this then it's quite easy for us to push a new release of at least trio and trio-asyncio to make them work directly without hacks. I think it would be easy for curio too, if they decide they want to join the party :-).

By the way, for fallback logic to detect asyncio, get_running_loop is probably a better bet than get_event_loop().is_running.

from aiostream.

Fuyukai avatar Fuyukai commented on August 12, 2024

Depending on how often you call it, surely you could read back in the frame stack to detect the library? (curio has curio module calls before your entry points, trio has trio module calls, etc etc). This is horribly slow if you call it constantly but if you base it on __module__ attributes in frames before the main entry point unless the library names change it should be relatively stable.

This would mean no (major) corner cases with detection, and is no-work-required on libraries.

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

[...] curio's requirement to use await.

I don't mind the helper to be a coroutine since it doesn't really make sense to use it outside of an asynchronous context. And since it's typically called at the very beginning of compatibility coroutines, there should be no context switching issue.

But if we want to go ahead with this then it's quite easy for us to push a new release of at least trio and trio-asyncio to make them work directly without hacks. I think it would be easy for curio too, if they decide they want to join the party :-).

Cool :) I like the idea of getting started with a hackish but working solution and let the frameworks some time to adapt.

By the way, for fallback logic to detect asyncio, get_running_loop is probably a better bet than get_event_loop().is_running.

Yes, but I wanted to support python 3.6.

By the way, 3.6 compatibilty is also an issue for contextvars since it's not available before 3.7.

from aiostream.

Fuyukai avatar Fuyukai commented on August 12, 2024

By the way, 3.6 compatibilty is also an issue for contextvars since it's not available before 3.7.

There's a backport for 3.6 on PyPI (which Trio uses in 3.5 and 3.6).

from aiostream.

njsmith avatar njsmith commented on August 12, 2024

Yes, but I wanted to support python 3.6.

Oh right... we should in fact use asyncio._get_running_loop, which does exist on both :-). (And yes it has an underscore in it, but I've been assured that it's a documented/stable API nonetheless.)

from aiostream.

smurfix avatar smurfix commented on August 12, 2024

Right. Then what about going with the hybrid approach of keeping the detection logic and caching the result in a context var?

No caching allowed, because trio_asyncio needs to set that to "asyncio" whenever it's executing asyncio code.

Thus, we need one common library which we (i.e. trio/curio/twisted/whoever) then all use to actually set that contextvar, which also means that the "yield a sentinel and check what pops back up" method is superfluous.

No, introspecting stack frames and whatnot is not a solution.

from aiostream.

smurfix avatar smurfix commented on August 12, 2024

On second thought, we could use that contextvar to store a couple of common entry points / types, instead of / in addition to a string. sleep() comes to mind. Or the exception that's thrown when cancelling something.

from aiostream.

njsmith avatar njsmith commented on August 12, 2024

store a couple of common entry points / types

That takes you into deep waters very quickly... for example, in asyncio you must catch the exception that's thrown when cancelling something (though you never really know if you're the one who cancelled it), and in trio you must never catch that exception.

But sure, sleep and current_time, why not.

from aiostream.

smurfix avatar smurfix commented on August 12, 2024

On second thought, the way trio_asyncio actually works (I have to admit that having not looked at that stuff myself for a couple of decades months I got confused myself), the yield-a-sentinel method would work perfectly well with trio_asyncio and it'd correctly distinguish between asyncio and trio modes. No special hooks, intercepts, etc. required, because the asyncio mainloop already is the intercept.

We'd still need a common library so that everybody uses the exact same sentinel, of course, and using contextvars would of course be faster – but otherwise, no problem.

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

One issue with the sentinel approach is performance. I know that speed isn't the priority here, but yielding all the way to the event loop just to get this piece of information seems a bit heavy to me. And if caching is not allowed, this round trip will happen whenever a compatibility coroutine is awaited.

Also, I'd like to point out that caching is not an issue if trio_asyncio overwrites the contextvar value (i.e. the cache) whenever it is switching from mode to another.

There's a backport for 3.6 on PyPI (which Trio uses in 3.5 and 3.6).

AFAIU, the backport doesn't help that much if the corresponding event loop doesn't support PEP 567 (i.e. if each created task doesn't come with its own context).

from aiostream.

smurfix avatar smurfix commented on August 12, 2024

Also, I'd like to point out that caching is not an issue if trio_asyncio overwrites the contextvar value (i.e. the cache) whenever it is switching from mode to another.

Exactly. This is why the contextvar that's to be used for caching needs to be visible – and if the runtime needs to set it anyway, then it could do so as soon as it starts up and creates its initial context: zero overhead and no need for adding support for a sentinel dance in the first place.

AFAIU, the backport doesn't help that much if the corresponding event loop doesn't support PEP 567 (i.e. if each created task doesn't come with its own context).

Well, if any still don't, then that should be added ASAP. Multiple loop implementations aren't the only reason why supporting contextvars is necessary.

from aiostream.

smurfix avatar smurfix commented on August 12, 2024

I am not sure I understand why ContextVars is a good thing.

Suppose you have a numerical library and you need to set options for handling NaNs or rounding or precision or whatever. So you use a context for that. However, if your code is async and you await inside that context, you'd expect that context to be still there after you return – otherwise Bad Things happen.

So that's why you need contexts in general. And as soon as you have support for contexts, well, then you get the ability to use that to store yet more context (in this case: which kind of loop you're in) for free.

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

Well, if any still don't, then that should be added ASAP. Multiple loop implementations aren't the only reason why supporting contextvars is necessary.

Sure, but what about asyncio on python 3.6? It's most probably not going to support context vars.

from aiostream.

njsmith avatar njsmith commented on August 12, 2024

@vxgmichel The contextvars support is only needed for libraries that want to help out and make it easy to detect them. Asyncio's not going to do that anyway, so for this purpose it doesn't matter whether it has contextvars support.

from aiostream.

andersea avatar andersea commented on August 12, 2024

You know, a viable option is to just have the user import a specialized version of the library for the event loop they are using.

Expicitly having to say from aiostream.trio import xyz seems fine to me.

from aiostream.

njsmith avatar njsmith commented on August 12, 2024

So I made this: https://github.com/python-trio/sniffio

I think it's basically complete and finished and ready for it's 1.0 release. But I'd feel better about that if someone besides me would at least glance over it and point out if I made any gratuitously annoying decisions. (In particular, @vxgmichel, does this look like it does what you need it to? @smurfix, any reason I've missed why this won't work for trio-asyncio?)

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

@njsmith

does this look like it does what you need it to?

Looks good thanks! I just posted a quick review: python-trio/sniffio#1.

from aiostream.

andersea avatar andersea commented on August 12, 2024

Hi @vxgmichel I noticed there was an anyio branch now. I tried it out and it looks functional. How far along do you think it is?

Thanks again for your hard work on this library, it is really a fantastic piece of work!

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

Hi @andersea :)

Funny you noticed, I recently fixed it! This branch isn't merged because I'm not sure what to do with it 😅 It raised a lot of questions and I'm still unsure about the proper course of action here.

What's left to do

First, the AsyncIteratorContext class has changed a lot (it's the base class for stream objects). It mainly behaves the same as before but there are still a few differences:

  • A new task is created for each async iterator context, and I'm not sure how this would affect the performance of existing programs (I suspect not that much but still, something to investigate).
  • Errors are still propagated from the producer to the consumer but not the other way around. I don't think it was a useful feature, but still, it might break stuff.
  • Iterating a stream without entering its streaming context was simply discouraged before (a warning was emitted). With the new implementation, it would make a lot more sense to simply forbid it but that would increase the entry cost for the library. Is that what we want ? 🤔

Then, there is the compatibility layer. I'm quite happy with anyio but I still had to write a few things on top of it, mainly:

  • A re-implementation of the trio memory channel, that I wouldn't see as production-ready
  • A way to leave the anyio land when it comes to timeout exception (more info starting here). Long story short, the branch raises the right exception for asyncio, curio and trio programs but it does not for anyio programs. Maybe something that could be fixed later when (or if) anyio decides to address this problem.

Finally, the branch is only tested for asyncio so there is still quite a bit of work to parametrize all the tests so they run with the three libraries.

What's the gain

That being said, I'd like to see this moving forward since it brings a lot of good things to the project:

  • Obviously, it adds trio and curio support
  • It should be conceptually more correct regarding how async generators are managed. Without going into the details, context managers inside those generators are guaranteed to run in a single task with consistent nesting (which is a strong trio requirement, see this discussion).
  • It opens the way to proper buffering of element between the streams, e.g:
    stream.produce_x() | pipe.buffer(n) | pipe.do_y()
    I really like this because it provides a clear and expressive way to control the back pressure.

The alternative

There's an alternative solution to all that: bring all those ideas into a new project, targeting trio alone. That would mean:

  • No anyio/compatibilty issue, hence simpler code base with simpler tests
  • No backward compatibility issue, hence the opportunity to re-design anything in the API.
    In particular, I'm thinking about dropping the pipe syntax for a rust-like dot syntax:
    stream.produce_x().buffer(n).do_y()

The problem then is that there would be two libraries to maintain, with similar but yet different APIs.

Conclusion

I'm glad you asked this question, since it gave me the opportunity to dump my thoughts into a single post. That being said I'm still unsure about the proper course of action here. It's hard for me to estimate how much aiostream is used to today, and in which ways. Pypistats reports quite a lot of downloads, and I suspect that aiostream is usually added as a dependency to avoid re-implementing a single operator such as stream.zip or stream.merge. I also think that the vast majority of users do not understand the point of wrapping the iteration with a context manager (e.g see this funny workaround).

So yes, I'd be curious to hear about your use cases and please let me know if you have any feedback on this write-up. I think I have been thinking about it on my own for much too long and I could really use some external input :)

from aiostream.

andersea avatar andersea commented on August 12, 2024

I just recently released bitmex-trio-websocket, which is a wrapper around the BitMEX crypto derivatives exchange websocket api. It uses async generators exclusively for message processing. I am, of course, using this along with private code for automated trading.

In my usage pattern, I often come across a sort of split-merge-split-merge problem. I may want to use the same channel for different things, so I need to split the data into two or more streams that does different calculations on the same data. I then often need to merge these streams, or a completely different stream, again further down the line, which is why the ziplatest stream is extremely useful for me.

The way you describe it, it kind of makes me lean towards having a loop-specific version. Before I noticed the anyio branch, I was actually planning to pull the library apart, extracting the bits I needed and just converting those to trio. I also noticed someone else tried that a while back #25.

For some units, dependency on asyncio can be removed by not depending on asyncio.iscoroutinefunction. As the trio developers point out, doing stuff like that is most likely a bad idea. Isn't it better to be explicit about what is expected? 😊 If you need a version with an async callback, it would be fine to have this as a separate import, I think.

This next idea is probably a little far fetched, but as I was reading that trio issue, I was thinking whether there was a way to avoid callbacks alltogether? This is probably a stupid idea, but could you do something like this:

@accumulate
def add(a, b)
    return a+b

Nah.. Probably not though..

I prefer the dot-syntax to the pipe-syntax. The pipe syntax looks like its trying to make python do something that it isn't supposed to do. But heck, since we are throwing everything in the air, how about something like this:

async def main():
    # This stream computes 11² + 13² in 1.5 second
    xs = stream.count(interval=0.1, pipe=(      # Count from zero every 0.1 s
        skip(10),                 # Skip the first 10 numbers
        take(5),                 # Take the following 5
        filter(lambda x: x % 2),  # Keep odd numbers
        map(lambda x: x ** 2),    # Square the results
        accumulate())             # Add the numbers together
    )
    print('11² + 13² = ', await xs)

It's just a tuple of operators. (I realise this conflicts with reserved names, so we probably need some explicit module prefix still..)

Or maybe this is prettier:

xs = pipe(
    count(interval=0.1),
    skip(10), ... # etc

Finally, I think you are right, in that a lot of people probably only use a couple of bits from the library. The stream context is pretty confusing. async generators are somewhat of a footgun sadly.

from aiostream.

andersea avatar andersea commented on August 12, 2024

You can bypass the need to check iscoroutinefunction in some cases:

@operator(pipable=True)
async def accumulate(source, func=op.add, initializer=None):
    """Generate a series of accumulated sums (or other binary function)
    from an asynchronous sequence.

    If ``initializer`` is present, it is placed before the items
    of the sequence in the calculation, and serves as a default
    when the sequence is empty.
    """
    iscorofunc = None
    async with streamcontext(source) as streamer:
        # Initialize
        if initializer is None:
            try:
                value = await anext(streamer)
            except StopAsyncIteration:
                return
        else:
            value = initializer
        # First value
        yield value
        # Iterate streamer
        async for item in streamer:
            value = func(value, item)
            if iscorofunc is None:
                try:
                    yield await value
                    iscorofunc = True
                except TypeError:
                    yield value
                    iscorofunc = False
            elif iscorofunc:
                yield await value
            else:
                yield value

Another example, which unfortunately is takes a performance penalty if you don't supply a coroutine, since catching exceptions is somewhat expensive:

@operator
async def call(func, *args, **kwargs):
    """Call the given function and generate a single value.

    Await if the provided function is asynchronous.
    """
    result = func(*args, **kwargs)
    try:
        yield await result
    except TypeError:
        yield result

This doesn't work for map though, since the signature is different for the sync and async version.

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

Thanks for taking the time to share your ideas @andersea :)

I suggest we move this discussion to the related issues:

  • Coroutine function detection produces false negatives: issue #53
  • Alternative to the pipe syntax: issue #52

About your other comments:

Finally, I think you are right, in that a lot of people probably only use a couple of bits from the library. The stream context is pretty confusing. async generators are somewhat of a footgun sadly.

I'm not sure the generators are to blame here. For instance, the same ideas could be implemented with memory channels and nurseries but the problem would remain the same: you need a context manager somewhere to perform the clean up if one breaks out of the async iteration at some point.

I think that's it's a difference between sync and async iteration that people simply don't see. I tried to make the streaming context in aiostream easy to use (although it could be done better, see issue #40), but it seems like users usually avoid it. I'm not sure what to do about that.

The way you describe it, it kind of makes me lean towards having a loop-specific version.

But then there are two code bases to maintain, and I'm not sure I want to get into that... Another course of action could be to simply release a version 2 that implements all the compatibility breaking ideas at once. This is what major version bumps are for, but I wonder how users would react to that 🤔

from aiostream.

parity3 avatar parity3 commented on August 12, 2024

I know this is an old topic discussed months ago (on this issue), but would there be a way to push out auto detection of the event loop into "user space"? Maybe still have autodetection but provide a way to disable / circumvent it entirely?

Either by importing a different module path branch, or by passing in setup argument on initial use? Seems simpler, as users don't have to rely on the auto-detection and there is no caching or performance tradeoffs involved. Even for users running multiple event loops in the same process, they probably still would benefit from the "explicity" of using different import namespaces or manually setting some setup/context var.

I tend to prefer being explicit in my usage of libs. A.k.a learning the hard way. No magic please!

from aiostream.

agronholm avatar agronholm commented on August 12, 2024

A way to leave the anyio land when it comes to timeout exception (more info starting here). Long story short, the branch raises the right exception for asyncio, curio and trio programs but it does not for anyio programs. Maybe something that could be fixed later when (or if) anyio decides to address this problem

Not sure what the problem here is with AnyIO's timeouts, but why not create an issue on the tracker then?

from aiostream.

vxgmichel avatar vxgmichel commented on August 12, 2024

@parity3

would there be a way to push out auto detection of the event loop into "user space"? Maybe still have autodetection but provide a way to disable / circumvent it entirely? Either by importing a different module path branch, or by passing in setup argument on initial use?

Well if we decide to move the trio support to a new project, then there will be no need for event loop detection. But if we choose to have a single implementation based on anyio, then we won't really care about loop detection either since anyio is doing it for us. Do you see a third way to go about it? Let me know if I missed your point :)

@agronholm

Not sure what the problem here is with AnyIO's timeouts, but why not create an issue on the tracker then?

You're right, I should definitely do that :) But my concerns with anyio are generally related to the fact that I'm not using it for its intended purpose. I'll try to explain in the corresponding issue.

from aiostream.

parity3 avatar parity3 commented on August 12, 2024

@vxgmichel great, sounds like choosing/deciding/detecting an event loop is not going to be a decision this lib would have to make directly anyways. Makes sense.

from aiostream.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.