Code Monkey home page Code Monkey logo

fsharp.control.taskseq's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

fsharp.control.taskseq's Issues

Interoperating with CancellableTasks

While writing #77 I realised it would be logical to allow this TaskSeq thing to interoperate with a standard CancellableTask, that is

taskSeq {
    let! res = cancellableTask { ... }
    ...
}

which would pass the CancellationToken governing the iteration of the TaskSeq to the Cancellable task. This would give robust cancellation token passing through the overall computation.

Under the alternative long term strategy in #77 this would be the same as

asyncSeq {
    let! res = async2 { ... } // re-implemented F# async
    ...
}

because F# cancellableTask and async2 are more or less the same thing.

This would mean bringing a CancellableTask into this library from IcedTasks, which may be a good thing.

Warnings NU1605 when building test project

You may see the following warnings when running build.cmd or building from VS:

d:\Projects\OpenSource\Abel\TaskSeq\src\FSharp.Control.TaskSeq.Test\FSharp.Control.TaskSeq.Test.fsproj : warning NU1605: Detected package downgrade: FSharp.Core from 6.0.7 to 6.0.3. Reference the package directly from the project to select a different version.  [d:\Projects\OpenSource\Abel\TaskSeq\src\FSharp.Control.TaskSeq.sln]

TaskEx: parallelLimit

Replaces #129. TaskEx top level issue: #139

Async.Parallel's optional degree of parallelism parameter was added late in the game, but is critical - dumping an arbitrary unbounded number of work items onto the threadpool is not something that should be easy and/or the default thing to do without due consideration for how that will work under stess.

There are some other shortcomings, which frequently lead to various bespoke helpers proliferating:

  • pipelining is painful, necessitating an explicit argument name (e.g. fun computations -> Async.Parallel(computations, maxDegreeOfParallelism=dop) etc) (note this is not the case for Async.Sequential)
  • before v FSharp.Core v 6.0.6, [there was a stack overflow bug that can tear down the process](// dotnet/fsharp#13165) if >1200 items are started with a throttle and cancellation is triggered quickly (so having a layer between Async.Parallel and direct consumption within an app might be useful)

Current proposed APIs (will be updated inline based on any discussion below):

module Async =
    let parallelLimit maxDegreeOfParallelism computations =
        Async.Parallel(computations, maxDegreeOfParallelism = maxDegreeOfParallelism)

NOTES:

  • the naming aligns with that used in Node https://www.npmjs.com/package/run-parallel-limit
  • A common case is to use this to run but await failure of multiple Async<unit> tasks. Having to use |> Async.Ignore<unit[]> is ugly for that (and most people probably do |> Async.Ignore, which prevents the compiler from helping you if your computations start to return values where they previously returned unit)
  • How do you swap back/forth from that to Task, considering cancellation tokens (see #142) and unwrapping AggregateExceptions (see #141). Providing an equivalent of this that works well with task expressions should likely be prototyped alongside any permanent API for this. Example impl. Also, perhaps a Task.sequential might make sense
  • having Throttled in the name is pretty well established in multiple internal library suites, and in posts such as https://www.compositional-it.com/news-blog/improved-asynchronous-support-in-f-4-7

F# fails to determine which overload of `TaskSeqBuilder.Using` to use.

F# fails to determine which overload of TaskSeqBuilder.Using to use when type implements both IDisposable and IAsyncDisposable.

A real-world example with the NpgsqlDataReader type from Npgsql:

#r "nuget: Npgsql"
#r "nuget: FSharp.Control.TaskSeq"

open Npgsql
open FSharp.Control


let readRows (command: NpgsqlCommand) (ct: CancellationToken) f = taskSeq {
    use! reader = command.ExecuteReaderAsync ct // Fails to compile.
    
    let! reader = command.ExecuteReaderAsync ct
    use reader = reader // Also fails to compile.
}

Below is a set of test cases replicating the issue.

#r "nuget: FSharp.Control.TaskSeq"
#r "nuget: Xunit"
#r "nuget: FsUnit"

open System
open System.Threading.Tasks
open FSharp.Control
open FsUnit
open Xunit


type private OneGetter() =
    member _.Get1() = 1

type private Disposable() =
    inherit OneGetter()

    interface IDisposable with
        member _.Dispose() = ()

type private AsyncDisposable() =
    inherit OneGetter()

    interface IAsyncDisposable with
        member _.DisposeAsync() = ValueTask()

type private MultiDispose() =
    inherit OneGetter()

    interface IDisposable with
        member _.Dispose() =
            ()

    interface IAsyncDisposable with
        member _.DisposeAsync() =
            ValueTask()

let private check ts = task {
    let! length = ts |> TaskSeq.length
    length |> should equal 1
}

[<Fact>]
let ``CE task: Using when type implements IDisposable``() =
    let ts = taskSeq {
        use x = new Disposable()

        yield x.Get1()
    }

    check ts

[<Fact>]
let ``CE task: Using when type implements IAsyncDisposable``() =
    let ts = taskSeq {
        use x = AsyncDisposable()
        yield x.Get1()
    }

    check ts


[<Fact>]
let ``CE task: Using when type implements IDisposable and IAsyncDisposable``() =
    let ts = taskSeq {
        use x = new MultiDispose() // Fails to compile
        yield x.Get1()
    }

    check ts

[<Fact>]
let ``CE task: Using! when type implements IDisposable``() =
    let ts = taskSeq {
        use! x = task { return new Disposable() }
        yield x.Get1()
    }

    check ts


[<Fact>]
let ``CE task: Using! when type implements IAsyncDisposable``() =
    let ts = taskSeq {
        use! x = task { return AsyncDisposable() }
        yield x.Get1()
    }

    check ts


[<Fact>]
let ``CE task: Using! when type implements IDisposable and IAsyncDisposable``() =
    let ts = taskSeq {
        use! x = task { return new MultiDispose() } // Fails to compile
        yield x.Get1()
    }

    check ts

Unintended object inference

Binding task-likes in the taskSeq-builder will trigger the FS3559-warning. This seems to be due to the existence of the 'TOverall parameter here:

        [<NoEagerConstraintApplication>]
        member inline Bind< ^TaskLike, 'T, 'U, ^Awaiter, 'TOverall> :
            task: ^TaskLike * continuation: ('T -> ResumableTSC<'U>) -> ResumableTSC<'U>
                when ^TaskLike: (member GetAwaiter: unit -> ^Awaiter)
                and ^Awaiter :> ICriticalNotifyCompletion
                and ^Awaiter: (member get_IsCompleted: unit -> bool)
                and ^Awaiter: (member GetResult: unit -> 'T)

This parameter cannot be inferred, since it is not used in the method, and is probably only present due to copy-paste.

Move namespaces from FSharpy.TaskSeq to FSharp.Control

We should either target Microsoft.FSharp.Control or FSharp.Control. I noticed that these namespaces, or at least the first one, is auto-opened in F# projects. According to @dsyme that shouldn't happen, so that behavior may not stick, but FusionTasks does exhibit that behavior, see for instance this source file on Async extensions, which get auto-opened as soon as you project has a reference to that lib.

I'll have to toy with the options a bit. Maybe the above behavior is only that way because Async is a type in F# Core already, but TaskSeq isn't.

Just opening this to keep track of options. Suggestions welcome.

Release version 0.4.0-alpha.1

Releasing an alpha version as some of the features I want completed in 0.4.0 are not completed yet. These are the relevant release notes:

0.4.0-alpha.1

Some highlights, but see Release 0.4.0.-alpha.1 for details

  • fixes not calling Dispose for 'use!', 'use', or finally blocks #157 (by @bartelink)
  • BREAKING CHANGE: null args now raise ArgumentNullException instead of NullReferenceException, #127
  • adds TaskSeq.takeWhile, takeWhileAsync, takeWhileInclusive, takeWhileInclusiveAsync, #126 (by @bartelink)
  • adds AsyncSeq vs TaskSeq comparison chart, #131
  • removes release-notes.txt from file dependencies, but keep in the package, #138

Add `skipWhile`, `skipWhileInclusive` with async variants, and `takeUntil` etc

Similar to #122. The naming should be skipWhile, even though skipUntil was proposed as well in that thread. F# Core has skipWhile, so we should stick with that. Also, I think they are semantically different:

Skip while

  • skipWhile: as long as predicate is true, keep skipping, then stop
  • skipWhileAsync: same, but async
  • skipWhileInclusive: same, but yield first, then test predicate on yielded value
  • skipWhileInclusiveAsync: same, but async

Skip until

  • skipUntil: skip until predicate becomes true
  • skipUntilAsync: same, but async
  • skipUntilInclusive: same, but skip first, then test predicate
  • skipUntilInclusiveAsync:same, but asyncate

And the missing Take Until

We should include these if we do the skipUntilXXX functions:

  • takeUntil: as long as predicate is false, keep skipping, then stop
  • takeUntilAsync: same, but async
  • takeUntilInclusive: same, but yield first, then test predicate on yielded value
  • takeUntilInclusiveAsync: same, but async

Not sure we should have both until and while, as they are logically each other's opposites.

Add better support for cancellation tokens passing

Currently, it is not very clear how users can pass a CancellationToken through. While the CE has support for cancellation tokens, and the token is passed on to GetAsyncEnumerator(ct), unless users access the interface directly, there is currently no way to pass a cancellation token.

There are four, not necessarily exclusive ways, to implement this.

  1. Add TaskSeq.setCancellationToken, which will return a taskSeq with the given cancellation token
  2. Add a do! overload that allows writing taskSeq { do! myCancelToken } in your CE, otherwise behaving the same as (1)
  3. Add a custom operation, such that you can write taskSeq { cancellationToken myCancelToken } in your CE
  4. Add a parameter to the taskSeq CE constructor. However, if possible, this may not be easily discoverable

There are upsides and downsides to each of these approaches. I think the first option, together with a getCancellationToken should at least be supported.

However, there are other challenges as well. The helper functions in the TaskSeq module all use the CancellationToken() constructor (that is: no cancellation support). The taskSeq builder in that respect is a bit of a mixed beast. Yes, you can pass in a cancellation token, but you have to do it manually, and then, using any of the helpers will basically ignore this token.

That's not a good position to be in. Probably, code like source.GetAsyncEnumerator(CancellationToken()) should become something like source.GetAsyncEnumerator(TaskSeq.getCancellationToken source). Which runs into another issue: while the state machine has a property for the cancellation token, any other implementation of IAsyncEnumerable<'T> does not, which requires a type check to detect.

Adding all this magic comes at a cost. In AsyncSeq and Async this was resolved by adding overloads that take an optional cancellation token. Not ideal either.

Remove irrelevant internals from the public surface area

Certain functions, like Debug.xxx or the module TaskSeqInternals are publicly visible (though most of the actual functions are not). Likewise, types like TaskSeqResumptionFunc should probably be hidden.

Adding proper fsi files (currently only for the TaskSeq module), should do the trick.

Consider tail recursion

We've removed the tail recursion because it was hard to consolidate it into yield! (it was instead done with return!, which has no place in taskSeq).

However, today I helped someone with some code that he considered for taskSeq which had the following approach:

let getPoliciesAsync policyid =
    asyncSeq{
        use connection = new NpgsqlConnection(npgsqlConnectionStringBuilder.ConnectionString)
        use command = new NpgsqlCommand($"SELECT policy_data FROM policy.tbl_policy where policy_id = {Sql.uuid policyid};", connection)
        let! reader = command.ExecuteReaderAsync() |> Async.AwaitTask
        let rec someRec() = asyncSeq{
            let! rowExists = reader.ReadAsync() |> Async.AwaitTask
            if rowExists then
                yield {| dt1 = reader.GetString(0) |}
                yield! someRec()
        }
        yield! someRec()
    } |> AsyncSeq.toAsyncEnum

As you can see, it uses asyncSeq, but also: it is recursive. The same approach with taskSeq would likely be more performant, however, if there are a lot of rows, this becomes problematic. This code can be rewritten with a loop, though.

@dsyme, sharing this with you in case we want to revisit this at some point.

Allow use of `Task`, as opposed to `Task<'T>` in `do!` statements

Currently, this is not possible:

taskSeq {
   do! Task.Delay 220  // fails, because this returns a Task, not a Task<unit>
}

Add necessary overloads for allowing this, similarly to how the task CE works.

Of note is that allowing Task can lead to unexpected behavior, as Task<'T> inherits from Task. However, we should strive to have parity with task in F#. to avoid surprises for users. Just like with normal task, we would still disallow do! task { return 10 }. This should be OK, as the F# compiler does not (yet) allow that kind of conversion.

Implement `TaskSeq.length`, `allPairs`, `cache`, `cast`, `concat`, `findIndex`, `contains`, `exists`, `init`, `initInfinite`, `indexed` etc

While most of the above have trivial semantics and should sometimes take an Async overload (i.e., findIndexAsync and initAsync for the lambda), I'm not entirely sure of the usefulness of TaskSeq.cache (and we already have TaskSeq.toSeqCached), but perhaps for parity it should be included.

A note on TaskSeq.cast vs Seq.cast

While the docs on Seq.cast say that it is meant for a "loosely typed sequence", to be cast to a generically strongly typed sequence, it can be used just fine to cast an F# seq, because it also implements IEnumerable (i.e., the non-generic variant).

// this is fine
let x: seq<uint> = seq { 1 } |> Seq.cast

But F# also allows you to perform an illegal cast:

// this won't give any warnings, but will throw an exception
let x: seq<Guid> = seq { 1 } |> Seq.cast

Since IAsyncEnumerable<'T> only exists as a typed sequence, we should honor that and only allow valid casts. However, F# doesn't allow a constraint like 'T :> 'U, the rh-side must be a concrete type. This means that in the end, it'll effectively work the same way as for Seq.cast through boxing. Alternatively, if the type-relation is known ahead of time, users should probably just write TaskSeq.map (fun x -> x :> _) just as they would for seq<_>.

// this is fine
let x: seq<uint> = taskSeq { 1 } |> TaskSeq.cast
// this *should* raise a compile-time exception, but there's no way to enforce that
let x: seq<Guid> = taskSeq { 1 } |> TaskSeq.cast  // incompatible

For comparison, Linq.Enumerable.Cast works through the untyped Enumerable as well.


EDIT: to get better parity with Seq.cast and Linq.Enumerable.Cast, I decided to only allow it to work with untyped task sequences (that is: IAsyncEnumerable<obj>). I've updated the signature below. In addition, we'll be adding a box and unbox helper as well, the latter only for value types. If users want a reinterpret_cast style cast, they can use TaskSeq.box >> TaskSeq.cast. Due to the static way this is compiled, this won't add overhead.

TODO list:

  • length, see #53
  • lengthBy, see #53
  • lengthByAsync, see #53
  • allPairs // later, requires cache to be implemented
  • indexed, see #68
  • cache // later, see MBP approach of AsyncSeq
  • cast, see #67
  • box, see #67
  • unbox, see #67
  • concat, see #69
  • findIndex, see #68
  • findIndexAsync, see #68
  • tryFindIndex, see #68
  • tryFindIndexAsync, see #68
  • contains
  • exists
  • existsAsync
  • init, see #69
  • initAsync, see #69
  • initInifinite, see #69
  • initInifiniteAsync, see #69

Proposals of signatures:

module TaskSeq =
    val length: source: taskSeq<'T> -> Task<int>
    val lengthBy: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<int>
    val lengthByAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<int>

    val allPairs: source1: taskSeq<'T> -> source2: taskSeq<'U> -> taskSeq<'T * 'U>
    val indexed: source: taskSeq<'T> -> taskSeq<int * 'T>
    val cache: source: taskSeq<'T> -> taskSeq<'T> // later, see MBP approach of AsyncSeq
    val cast: source: taskSeq<obj> -> taskSeq<'U>
    val box: source: taskSeq<'T> -> taskSeq<obj>
    val unbox<'T when 'T: struct> : source: taskSeq<obj> -> taskSeq<'U>
    val concat: source1: taskSeq<'T> -> source2: taskSeq<'T> -> taskSeq<'T>

    val findIndex: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<int>
    val findIndexAsync: predicate: ('T -> Task<bool>) -> source: taskSeq<'T> -> Task<int>
    val tryFindIndex: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<int option>
    val tryFindIndexAsync: predicate: ('T -> Task<bool>) -> source: taskSeq<'T> -> Task<int option>
    val contains: value: 'T -> source: taskSeq<'T> -> Task<bool> (requires comparison)
    val exists: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<bool>
    val existsAsync: predicate: ('T -> Task<bool>) -> source: taskSeq<'T> -> Task<bool>

    val init: count: int -> initializer: (int -> 'T) -> taskSeq<'T>
    val initAsync: count: int -> initializer: (int -> Task<'T>) -> taskSeq<'T>
    val initInfinite: initializer: (int -> 'T) -> taskSeq<'T>
    val initInfiniteAsync: initializer: (int -> Task<'T>) -> taskSeq<'T>

Should we throw an exception for TaskSeq.zip when the sequences are of unequal length?

While writing tests for #31, I started pondering this โ˜๏ธ.

For comparison, F# throws an ArgumentException for Array.zip and List.zip, but not for Seq.zip. Most likely, this is because "getting the next item" can have an unwanted side effect, so instead, they choose to stop processing as soon as one of the sequences is exhausted.

Arguably, such side effect can still happen. I.e., for zip we have to call MoveNextAsync, which will give us a bool that tells us whether we reached the end of the sequence. But, if the first sequence still has items, but the second sequence is exhausted, the side effect of the first sequence for "one item past the last" has already happened.

In fact, I think the Seq.zip in F# should probably come with a warning. What if each item is "getting a web page of several megabytes"? AFAIK, there's no peek function that would potentially tell me, without side effects, that a sequence is exhausted.

Edit: linking the discussion in the F# repo: dotnet/fsharp#14121

Productize Task/ValueTask/Async.`ignore`

I've long felt icky about using Async.Ignore. Seeing the Async.ignore that's presently hiding in plain sight here makes me realise the absence of a Task.ignore and ValueTask.ignore from FSharp.Core is something I've also been grinning and bearing (sprinkling :> Task and all sorts of other such mindless hacks). Utils.fs presently has helpers that feel very tempting to use more broadly. However, binding to a set of support helpers on the fringe of this library's core feature set is obviously debatable.

(The other pair from here that I feel deserves a canonical implementation in a library that doesn't include a jungle of other less-related helpers is Async.ofTask/Task.toAsync, but with corrected AggregateException semantics, and honoring of Async.Cancellation when compared to Async.AwaitTask as per FSharp.Core. Will likely calve off a separate issue from this if placeholder FSharp.Core.TaskShims package/repo is deemed to be the best home for a set of ignore helpers)

related:

Implement functions `TaskSeq.skip`, `skipWhile`, `where`, `truncate`, `take`, `insertAt`, `updateAt`, `forall`, `concat` (seq)

Please add the following from the wish list of the front page:

TODO list:

  • skip, take (#209)
  • truncate and drop (as with Seq.truncate, these are the counterparts of take and skip that do not throw) (see #209)
  • skipWhile, skipWhileAsync, skipWhileInclusive, skipWhileInclusiveAsync (#219)
  • where and whereAsync (to mimic Seq, but this is just an alias for filter) (#217)
  • (takeWhile etc, see #122 / #126, already implemented)
    • improvements: #235
  • insertAt, removeAt and updateAt, plus insertManyAt, removeManyAt and updateManyAt (#236)
  • forall and forallAsync (#240)
  • max, maxBy, min, minBy, plus maxByAsync and minByAsync (#221)
  • concat overloads (for concatenating a taskseq of sequences/lists etc) (#237)

Full list of signatures:

val take: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
val skip: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
val drop: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
val truncate: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>

val skipWhile: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val skipWhileAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>
val skipWhileInclusive: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val skipWhileInclusiveAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>

val where: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val whereAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>

// these four already implemented:
val takeWhile: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val takeWhileAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>
val takeWhileInclusive: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val takeWhileInclusiveAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>

val insertAt: position:int -> value:'T -> source: TaskSeq<'T> -> TaskSeq<'T>
val removeAt: position:int -> source: TaskSeq<'T> -> TaskSeq<'T>
val updateAt: position:int -> value:'T -> source: TaskSeq<'T> -> TaskSeq<'T>

val insertManyAt: position:int -> values:TaskSeq<'T> -> source: TaskSeq<'T> -> TaskSeq<'T>
val removeManyAt: position:int -> count:int -> source: TaskSeq<'T> -> TaskSeq<'T>

val forall: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<bool>
val forallAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> Task<bool>

val max: source:TaskSeq<'T> -> Task<'T> (requires comparison)
val min: source:TaskSeq<'T> -> Task<'T> (requires comparison)
val maxBy: projection: ('T -> 'U) -> source:TaskSeq<'T> -> Task<'T> (requires comparison)
val minBy: projection: ('T -> 'U) -> source:TaskSeq<'T> -> Task<'T> (requires comparison)
val maxByAsync: projection: ('T -> #Task<'U>) -> source:TaskSeq<'T> -> Task<'T> (requires comparison)
val minByAsync: projection: ('T -> #Task<'U>) -> source:TaskSeq<'T> -> Task<'T> (requires comparison)

// overloads for existing concat (which takes nested task sequences)
val concat: sources: TaskSeq<'T seq> -> TaskSeq<'T>
val concat: sources: TaskSeq<'T list> -> TaskSeq<'T>
val concat: sources: TaskSeq<'T array> -> TaskSeq<'T>
val concat: sources: TaskSeq<ResizeArray<'T>> -> TaskSeq<'T>

CI tests appear to run indefinitely when parallelized, but only on Github, not locally

Attempt at fixing and more analysis: #27.

While working on PR #23, it turned out that the test runner never finished, leading to empty logs (while running or after canceling). The bad behavior of the Github reporting is explained here: actions/runner#1326 (and it was closed with no fix).

The cause appears to be a (potential) race condition. It is yet unclear whether that is caused by the resumable code or something else. The temporary solution in #23 is to disable parallel runs in CI and to improve reporting.

Unfortunately, xUnit reporting is quite useless for cases like this. It only reports when a test has finished, not when it started. While dotnet run has a --blame-hang-timeout, which at least solves the issue with GH action reporting, as at the very least, this results in a failed test run.

Opening this issue to investigate further, so that we can close #23 meanwhile, as the implementations there have nothing to do with this behavior (removing enough tests removes the issue, but it doesn't matter which ones you remove, ultimately, the race condition comes back).

Example of a run that went on for over 5 hours (!): https://github.com/abelbraaksma/TaskSeq/actions/runs/3250819154/jobs/5334999458 (note: after retention period is over, this log will be gone).

Performance smoke-tests seem to be broken

Running ``CE taskSeq with nested deeply yield!`` (permalink, goes to original from the time of writing this) takes forever, but based on the non-delaying performance, it should be "just fine" and take an average time.

This test creates nested IAsyncEnumerable<'T>, themselves based on unit -> task<'T> functions to prevent having hot-started tasks. However, the test takes over 30s to finish (with the commented code enabled). Note that all of these tests deliberately use a randomly generated delay though Task.Delay.

Similar tests with many tasks that are not delayed do not have an issue.

Disposal is not called with `use!` and the like

It appears that Dispose() or DisposeAsync() is not being called when use! is used. First mentioned offline by @bartelink. Logging here to link the PR to a tracking issue and for the changelog/release notes.

Turned out that this was inadvertently commented out in 27486f3, which shouldn't have happened. In a follow up I will create some tests to ensure this won't happen again.

Please implement `TaskSeq.tryItem`, `tryLast`, `tryHead`, `tryPick`, `tryFind` and the non-try counterparts

Signatures should be similar like for Seq, with overloads for async versions so that the picker or chooser function can return a Task.

Consider:

TaskSeq.isEmpty
TaskSeq.head
TaskSeq.tryHead
TaskSeq.last
TaskSeq.tryLast
TaskSeq.tryExactlyOne
TaskSeq.exactlyOne
TaskSeq.item
TaskSeq.tryItem
TaskSeq.find + async version
TaskSeq.tryFind + async version
TaskSeq.pick + async version
TaskSeq.tryPick + async version
TaskSeq.filter + async version
TaskSeq.choose + async version

As with Seq, the non-try variants will raise an exception if the precondition is not met.

CancellationToken not used/registered

Hi.

Thanks for working on this. I have been exploring this repository since I wanted to do something similar, and I noticed that even when manually passing in a cancellation token, it is not directly acted upon. So for example

let consumeManually (enumerable: IAsyncEnumerable<int>) (token: CancellationToken) = task {
    do! Task.Yield()
    let mutable hasRead = false
    use enumerator = enumerable.GetAsyncEnumerator(token)        
    let! canRead = enumerator.MoveNextAsync()
    hasRead <- canRead
    while hasRead do
        let _here = enumerator.Current
        let! canRead = enumerator.MoveNextAsync()
        hasRead <- canRead

    return ()
}

let infinite () = taskSeq {        
    while true do
        yield 1
}

[<Fact>]
let usesCancellationToken () = task {
    use src = new CancellationTokenSource()
    let readToEnd = consumeManually (infinite()) src.Token
    do! Task.Delay(100)
    src.Cancel()
    do! readToEnd 
}

will run forever. However, if something like Async.Sleep() is bound inside the taskSeq, it will end with an exception, since the token is actually passed to the async when it is bound.

Do you think it would make sense to set the promiseOfValueOrEnd to a cancelled exception (which seems to be the ideomatic dotnet way) or complete(false) when the token is cancelled? I.e, using the CancellationTokenRegistration to abort MoveNextAsync on cancellation?

TaskEx index

Replaces #128 #129

Within the F# ecosystem, there are a number of commonly used augmentations that typically get maintained as internal helpers within libraries and applications, manually copied around in various ways. Examples of such helpers:

While it can be argued that many of these concerns should be addressed in FSharp.Core, these are being logged here first in an attempt to narrow down the number of suggestions, with a view to getting stable naming (and potentially a shims library of some kind as a stopgap). NOTE that building a one stop shop for lots of arbitrary helpers is a non-goal of this effort.

NOTES:

Support `ConfiguredCancelableAsyncEnumerable<'T>`

When you call .WithCancellation(cancellationToken) on IAsyncEnumerable<'T> you get System.Runtime.CompilerServices. ConfiguredCancelableAsyncEnumerable<'T> which is not currently supported.

So that this code does not work

    static member ToFlatListAsync<'Source>(source: IQueryable<'Source>, [<Optional>] cancellationToken: CancellationToken) = task {
        let builder = ImmutableArray.CreateBuilder<'Source>()
        do!
            source.AsAsyncEnumerable().WithCancellation(cancellationToken)
            |> TaskSeq.iterAsync (fun x -> builder.Add(x))

        return builder.ToImmutable();
    }

Implement `^TaskLike` similar to F#'s `task`

To support the same types F# Core supports with task, specifically in let! and do!, we'll need to implement the ^TaskLike SRTP approach. This prevents ambiguities caused by Task<'T> :> Task. It also limits the needed overloads for ValueTask<'T> and ValueTask.

Related to #43, which can only be implemented by using this approach.

This change will unify the types supported between task and taskSeq, except for async, which should be added next (see #79).

Iterating multiple times over a taskSeq { ... } raises InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed.

As in the title. Any of these errors only happen when multiple iterations are attempted and the source is a taskSeq CE, not when the source is a user-defined or library defined IAsyncEnumerator<_>, used with the TaskSeq library functions.

I already started investigating this issue and it has to do with properly resetting state when "reaching the end" and when "getting an enumerator over the same resource". See #36 continued: #42.

Operation not valid error, MoveNextAsync()

// this throws: InvalidOperationException: Operation is not valid due to the current state of the object.
task {
    let tskSeq = taskSeq { yield 1; yield 2 }
    let enum = tskSeq.GetAsyncEnumerator()
    let! isNext = enum.MoveNextAsync()  // true
    let! isNext = enum.MoveNextAsync()  // true
    let! isNext = enum.MoveNextAsync()  // false
    let! isNext = enum.MoveNextAsync()  // error here
    ()
}

Operation not valid error, multiple GetAsyncEnumerator() with MoveNextAsync()

// throws: InvalidOperationException: Operation is not valid due to the current state of the object.
task {
    let tskSeq = getEmptyVariant variant
    use enumerator = tskSeq.GetAsyncEnumerator()
    let! isNext = enumerator.MoveNextAsync()
    use enumerator = tskSeq.GetAsyncEnumerator()
    let! isNext = enumerator.MoveNextAsync()  // throws here
    ()
}

Transition State error

// this throws: 
// InvalidOperationException: 
// An attempt was made to transition a task to a final state when it had already completed.
task {
    let tskSeq = taskSeq { yield 1; yield 2 }
    let ts1 = tskSeq |> TaskSeq.map (fun i -> i + 1)
    let result1 = TaskSeq.toArray ts1
    let ts2 = ts1 |> TaskSeq.map (fun i -> i + 1)
    let result2 = TaskSeq.toArray ts2 // error here
    ()
}

Implement dynamic versions of the resumable code

In rare scenarios, for instance, where the resumable code is invoked in a top-level function, the compiler may not be able to compile the resumable code statically. For these cases, we need a dynamic version of the resumable code (which exists, but currently simply raises an exception).

This issues serves as a placeholder for this.

TaskEx: AwaitTaskCorrect / Task.toAsync / Async.ofTask

Replaces #129. TaskEx top level issue: #139

The default implementation of the Async.AwaitTask methods in FSharp.Core have some key shortcomings:

  1. when the Task faults, yielding an exception, that exception is typically (always?) wrapped in an egregious AggregateException
  2. the default implementation does not abort/cancel when the ambient CancellationToken of the async expr within which Async.AwaitTask is triggered
  3. cancelling an Async computation should not just abort the processing, it should also propagate a TaskCancelledException to align with the behavior of Task

While it can be argued that the current behavior is 'wrong', it's also obvious that breaking it would be untenable, and the semantic differences are beyond what one might cover with subtle overloads and/or adding optional arguments etc.

Current proposed APIs (will be updated inline based on any discussion below):

module Async =
    let inline ofTask (t : Task<'t>) : Async<'t> = AwaitTaskCorrect t
    let inline ofUnitTask (t : Task) : Async<'t> = AwaitTaskCorrect t
module Task =
    let inline toAsync (t : Task<'t>) : Async<'t> = Async.ofTask t

NOTES:

  • the above naming is taken from the helpers within TaskSeq, which are exposed in the FSharp.Control namespace. NOTE the current implementations use Async.AwaitTask, but that was not as a conscious choice, and there is a desire` to fix at least some of the shortcomings noted
  • given the fact that task and async are now first class citizens of FSharp.Core, having an of/to pairing would seem to make sense. This is open to debate; not sure the degree to which the prior art is consistent wrt this beyond collection types/modules
  • There is an fslang-suggestion regarding this. The purpose of this issue is as a placeholder for potentially filling the gap until this issue can be more thoroughly resolved in FSharp.Core proper.
  • Equinox, Propulsion and FSharp.AWS.DynamoDB all have copies of the canonical impl in Eirik's fssnip. Not ruling out tweaks to the semantics, but ideally those libraries, and others, would all share the same semantics
  • if this is handled as a module, one also frequently needs an ofUnitTask alongside as above (Async.AwaitTask is a pair of overloaded methods, which often brings its own issues with intellisense and error messages etc)

chunkBySize

I see chunkBySize is supposed to be added. Is there any guidance or reference for patching this in myself? I've come across a use for it in a pipeline.

Thanks

Implement For on the built-in TaskBuilder

It would be useful to add an extension method For on TaskBuilder to be able to do for loops over IAsyncEnumerable in tasks.

let mySeq =
    taskSeq {
        for i in 0..9 do yield i
    }

let myTask =
    task {
        for x in mySeq do // <-- This is currently not possible.
            printfn $"{x}"
    }

TaskEx: Async.startImmediateAsTask

Replaces #129. TaskShims top level issue: #139

In contrast to its sibling, Async.StartAsTask (because it is not starting a thread), Async.StartImmediateAsTask does not have a taskCreationOptions optional parameter, so people often use it pipeline expressions.

However, that makes it easy to gloss over the fact that the computation will then not have an ambient cancellation token.

In order to resolve those forces (wanting to be able to execute an async via piping, without the risk of omitting to consider cancelation), it is proposed to have a common helper function (with a lower case name) that

  • takes a cancellation token (as its first argument)
  • passes that and the Async onto Async.StartImmediateAsTask

Current proposed APIs (will be updated inline based on any discussion below):

module Async =
    let inline startImmediateAsTask ct (a : Async<'t>) = Async.StartImmediateAsTask(a, cancellationToken = ct)
   // ALTERNATELY
    let inline executeAsTask ct (a : Async<'t>) = Async.StartImmediateAsTask(a, cancellationToken = ct)

NOTES:

Naming notes

I do notice the name "TaskSeq" is confusing some people - people thinking this is about "sequences of tasks". I'm not sure what to do about this.

While thinking about this I included some general notes on naming in this space, see below

  • F# IEnumerator

    • cold start
    • run once
    • no implicit cancellation token
    • no asynchronous waits
    • many results
    • state machines
    • = HotSynchronousFastEnumerator
    • = IEnumerable<T>
  • F# IEnumerable = Seq

    • cold start
    • run multiple
    • no implicit cancellation token
    • no asynchronous waits
    • many results
    • state machines
    • = ColdSynchronousFastEnumerable
    • ~= unit -> IEnumerable<T>
  • .NET/F#/C# Task = C# async/await

    • hot start
    • run once
    • no implicit cancellation token
    • asynchronous waits
    • one result
    • state machines
    • = HotAsynchronousFastValue
    • = Task<T>
  • IcedTask ColdTask

    • cold start
    • run many times
    • no implicit cancellation token
    • asynchronous waits
    • one result
    • state machines
    • = ColdAsynchronousFastValueFactory
    • ~= unit -> Task<T>
  • IcedTask CancellableTask

    • cold start
    • run many
    • implicit cancellation token
    • asynchronous waits
    • one result
    • state machines
    • = ColdAsynchronousFastCancellableValueFactory
    • ~= CancellationToken -> Task<T>
  • Async = F# async

    • cold start
    • run multiple
    • implicit cancellation token
    • asynchronous waits, one result
    • no state machines
    • = ColdAsynchronousCancellableValueFactory
    • ~= CancellationToken -> Task<T>
  • Current F# AsyncSeq

    • cold start
    • run multiple
    • implicit cancellation token
    • asynchronous waits
    • many results
    • no state machines
    • = ColdAsynchronousCancellableEnumerable
    • ~= CancellationToken -> IAsyncEnumerator<T>
  • Current F# TaskSeq

    • cold start
    • run multiple
    • implicit cancellation token governing iteration but not passed to each task along the way
    • asynchronous waits
    • many results
    • state machines
    • = ColdAsynchronousHalfCancellableEnumerable
    • ~= CancellationToken -> IAsyncEnumerator<T>

I'm leaving the question of tailcalls off the list, as much as I'd like to address that.

It's worth noting that at a high level there's no real logical difference between CancellableTask and F# Async<_>. Nor between F# TaskSeq and F# AsyncSeq.

The sweet spot for F# is really Cold+RunMany+Asynchronous+Fast+Cancellable+Tailcalls, which is what TaskSeq is close to being technically (except tailcalls, sadly).

Remove AggregateException wrapping in Async CE `for` extension and prevent threadpool transition

At present the current impl of for within async { expressions raises two concerns for me:

  • AwaitTaskCorrect semantics would be preferable to promulgating usage of Async.AwaitTask in a place that most people will not necessarily even infer that it's in play.
  • AIUI Async.StartAsTask includes an unnecessary transition to the Thread Pool (with a potential context switch?) which could instead safely be replaced with Async.StartImmediateAsTask in this context. EDIT: moved to separate thread #135
  • the starting of the child Async does not propagate the continuation token (not sure whether fixing that is possible/required) EDIT: separate issue, see #133

TaskEx: ignore

Replaces #128. TaskEx top level issue: #139

Async.Ignore has always been ugly and undiscoverable. While I tend to make ignoring explicit by using let! _ = <async stuff I want to ignore result of>, it's commonly the last expression in a function, and having to do let! _ = <thing I'm wrapping> in () is too much.

It is proposed that the module associated with any given builder should by convention have an ignore function that correctly observes completion of the work, dropping the result, but propagating exceptions, if any

Current proposed APIs that are not currently in FSharp.Core (will be updated inline based on discussion below):

module Async =
    let inline ignore (a : Async<'t>) = Async.Ignore
module Task =
    let inline ignore (t : Task<'t>) = ...
module ValueTask =
    let inline ignore (t : ValueTask<'t>) = ...

NOTES:

  1. Currently, TaskSeq (and other libs such as IcedTasks contain various bespoke implementations
  2. While this could live in a TaskEx lib, it could be argued that FSharp.Core is the obvious home for them. (However that would raise the issue of whether they need to go into the 6.x release line in order to align with minimal dependencies for various common libraries)

Why is 6.0.2 the min FSharp.Core version requirement?

Great to see this taking shape. Was able to port Equinox to use it in one impl without issues. Am intending to spread that to the other integrations in due course.

However, it does force a dep on FSharp.Core v 6.0.2 - is there an upstream requirement that triggers this ? Is this likely to remain stable?

(Reason I ask is that the Equinox deps are 6.0.0 and I was trying to hold the line on that. Should I up the requirement, Equinox will likely shift to depending on 6.0.6, as there's an ugly shim required for a StackOverflow in Async.Parallel that's not fixed until then)

Consider `StartImmediateAsTask` instead of `StartAsTask` to prevent a thread hop

In PR #114 (adding support for Async in Bind), @bartelink raised the question whether it would be better to use StartImmediateAsTask instead of StartAsTask.

Specifically, it is about this part:

let mutable awaiter =
    Async
        .StartAsTask(asyncSource, cancellationToken = sm.Data.cancellationToken)
        .GetAwaiter()

The current approach was taken to mimic the same approach taken for task in F# Core: https://github.com/dotnet/fsharp/blob/d91b6c5c97accf363d135d1f99816410da4ec341/src/FSharp.Core/tasks.fs#L462, basically:

member inline this.Bind(computation: Async<'TResult1>, continuation: ('TResult1 -> TaskCode<'TOverall, 'TResult2>)) : TaskCode<'TOverall, 'TResult2> =
    this.Bind(Async.StartAsTask computation, continuation)

For comparison, see what was done in async2, which was poised to be the new async based on resumable code, like task is, but unlike task cold-started like the current async.

So, we have two scenarios:

  • use StartAsTask to get similar semantics as the task CE, but this causes a thread hop
  • use StartImmediateAsTask, which uses the current context

I'm not aware of any downside of using the latter. It definitely saves a thread switch in the general case (note that the thread pool may not always cause a physical thread switch).

Move to static members and overloads, to support cancellation token passing, `ValueTask` and `Async` in the module members

It is currently quite a pain to implement cancellation passing explicitly. As has been said in several issues already, we need to support this on each member to do it properly. Plus we will need overloads:

  • Each method taking a TaskSeq should also seamlessly take a ConfiguredCancelableAsyncEnumerable<_>, which is available in .NET Standard 2.1. This would solve #167, reported by @xperiandri (PS: to prevent the surface area to blow up, I will check if ofCancelable/toCancelable functions would serve the same purpose).
  • Each method currently with the Async prefix should be able to take a Task (as they can, currently), an Async and a ValueTask. Writing functions like iterAsync, iterWithTask, iterWithValueTask etc is going to be too painful long-run.
  • Each method should have an optional cancellation token argument as their last argument (?), so that piping into the functions is still possible, but it will default to no cancellation token. Only members that consume the task sequence should get a cancellation token argument.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.