fsprojects / fsharp.control.taskseq Goto Github PK
View Code? Open in Web Editor NEWA computation expression and module for seamless working with IAsyncEnumerable<'T> as if it is just another sequence
License: MIT License
A computation expression and module for seamless working with IAsyncEnumerable<'T> as if it is just another sequence
License: MIT License
While writing #77 I realised it would be logical to allow this TaskSeq thing to interoperate with a standard CancellableTask, that is
taskSeq {
let! res = cancellableTask { ... }
...
}
which would pass the CancellationToken governing the iteration of the TaskSeq to the Cancellable task. This would give robust cancellation token passing through the overall computation.
Under the alternative long term strategy in #77 this would be the same as
asyncSeq {
let! res = async2 { ... } // re-implemented F# async
...
}
because F# cancellableTask
and async2
are more or less the same thing.
This would mean bringing a CancellableTask into this library from IcedTasks, which may be a good thing.
Relevant discussion in this revert: #212.
Note: this update from v3 -> v4 is COMPLETELY INCOMPATIBLE (see: actions/upload-artifact#472 and dorny/test-reporter#343) and there's no clear guideline to be found on the horizon yet....
I'm talking of these two:
ValueTask.ofIValueTaskSource
-> ValueTask.ofSource
ValueTask.FromResult
-> ValueTask.fromResult
And we want to use proper naming for our types (PascalCase for the type, camelCase for the CE builder), so the type taskSeq<_>
will be changed to TaskSeq<_>
. This has already been done in #187.
You may see the following warnings when running build.cmd
or building from VS:
d:\Projects\OpenSource\Abel\TaskSeq\src\FSharp.Control.TaskSeq.Test\FSharp.Control.TaskSeq.Test.fsproj : warning NU1605: Detected package downgrade: FSharp.Core from 6.0.7 to 6.0.3. Reference the package directly from the project to select a different version. [d:\Projects\OpenSource\Abel\TaskSeq\src\FSharp.Control.TaskSeq.sln]
We could allow binding to an existing F# Async, passing the cancellation token through.
Similar to this in the existing AsyncSeq: https://github.com/fsprojects/FSharp.Control.AsyncSeq/blob/main/src/FSharp.Control.AsyncSeq/AsyncSeq.fs#L425
Replaces #129. TaskEx top level issue: #139
Async.Parallel
's optional degree of parallelism parameter was added late in the game, but is critical - dumping an arbitrary unbounded number of work items onto the threadpool is not something that should be easy and/or the default thing to do without due consideration for how that will work under stess.
There are some other shortcomings, which frequently lead to various bespoke helpers proliferating:
fun computations -> Async.Parallel(computations, maxDegreeOfParallelism=dop)
etc) (note this is not the case for Async.Sequential
)Async.Parallel
and direct consumption within an app might be useful)Current proposed APIs (will be updated inline based on any discussion below):
module Async =
let parallelLimit maxDegreeOfParallelism computations =
Async.Parallel(computations, maxDegreeOfParallelism = maxDegreeOfParallelism)
NOTES:
Async<unit>
tasks. Having to use |> Async.Ignore<unit[]>
is ugly for that (and most people probably do |> Async.Ignore
, which prevents the compiler from helping you if your computations start to return values where they previously returned unit
)Task
, considering cancellation tokens (see #142) and unwrapping AggregateException
s (see #141). Providing an equivalent of this that works well with task
expressions should likely be prototyped alongside any permanent API for this. Example impl. Also, perhaps a Task.sequential
might make senseThrottled
in the name is pretty well established in multiple internal library suites, and in posts such as https://www.compositional-it.com/news-blog/improved-asynchronous-support-in-f-4-7F# fails to determine which overload of TaskSeqBuilder.Using
to use when type implements both IDisposable
and IAsyncDisposable
.
A real-world example with the NpgsqlDataReader
type from Npgsql:
#r "nuget: Npgsql"
#r "nuget: FSharp.Control.TaskSeq"
open Npgsql
open FSharp.Control
let readRows (command: NpgsqlCommand) (ct: CancellationToken) f = taskSeq {
use! reader = command.ExecuteReaderAsync ct // Fails to compile.
let! reader = command.ExecuteReaderAsync ct
use reader = reader // Also fails to compile.
}
Below is a set of test cases replicating the issue.
#r "nuget: FSharp.Control.TaskSeq"
#r "nuget: Xunit"
#r "nuget: FsUnit"
open System
open System.Threading.Tasks
open FSharp.Control
open FsUnit
open Xunit
type private OneGetter() =
member _.Get1() = 1
type private Disposable() =
inherit OneGetter()
interface IDisposable with
member _.Dispose() = ()
type private AsyncDisposable() =
inherit OneGetter()
interface IAsyncDisposable with
member _.DisposeAsync() = ValueTask()
type private MultiDispose() =
inherit OneGetter()
interface IDisposable with
member _.Dispose() =
()
interface IAsyncDisposable with
member _.DisposeAsync() =
ValueTask()
let private check ts = task {
let! length = ts |> TaskSeq.length
length |> should equal 1
}
[<Fact>]
let ``CE task: Using when type implements IDisposable``() =
let ts = taskSeq {
use x = new Disposable()
yield x.Get1()
}
check ts
[<Fact>]
let ``CE task: Using when type implements IAsyncDisposable``() =
let ts = taskSeq {
use x = AsyncDisposable()
yield x.Get1()
}
check ts
[<Fact>]
let ``CE task: Using when type implements IDisposable and IAsyncDisposable``() =
let ts = taskSeq {
use x = new MultiDispose() // Fails to compile
yield x.Get1()
}
check ts
[<Fact>]
let ``CE task: Using! when type implements IDisposable``() =
let ts = taskSeq {
use! x = task { return new Disposable() }
yield x.Get1()
}
check ts
[<Fact>]
let ``CE task: Using! when type implements IAsyncDisposable``() =
let ts = taskSeq {
use! x = task { return AsyncDisposable() }
yield x.Get1()
}
check ts
[<Fact>]
let ``CE task: Using! when type implements IDisposable and IAsyncDisposable``() =
let ts = taskSeq {
use! x = task { return new MultiDispose() } // Fails to compile
yield x.Get1()
}
check ts
Binding task-likes in the taskSeq-builder will trigger the FS3559-warning. This seems to be due to the existence of the 'TOverall parameter here:
[<NoEagerConstraintApplication>]
member inline Bind< ^TaskLike, 'T, 'U, ^Awaiter, 'TOverall> :
task: ^TaskLike * continuation: ('T -> ResumableTSC<'U>) -> ResumableTSC<'U>
when ^TaskLike: (member GetAwaiter: unit -> ^Awaiter)
and ^Awaiter :> ICriticalNotifyCompletion
and ^Awaiter: (member get_IsCompleted: unit -> bool)
and ^Awaiter: (member GetResult: unit -> 'T)
This parameter cannot be inferred, since it is not used in the method, and is probably only present due to copy-paste.
We should either target Microsoft.FSharp.Control
or FSharp.Control
. I noticed that these namespaces, or at least the first one, is auto-opened in F# projects. According to @dsyme that shouldn't happen, so that behavior may not stick, but FusionTasks
does exhibit that behavior, see for instance this source file on Async
extensions, which get auto-opened as soon as you project has a reference to that lib.
I'll have to toy with the options a bit. Maybe the above behavior is only that way because Async
is a type in F# Core already, but TaskSeq
isn't.
Just opening this to keep track of options. Suggestions welcome.
Releasing an alpha version as some of the features I want completed in 0.4.0 are not completed yet. These are the relevant release notes:
Some highlights, but see Release 0.4.0.-alpha.1 for details
finally
blocks #157 (by @bartelink)Similar to #122. The naming should be skipWhile
, even though skipUntil
was proposed as well in that thread. F# Core has skipWhile
, so we should stick with that. Also, I think they are semantically different:
skipWhile
: as long as predicate is true, keep skipping, then stopskipWhileAsync
: same, but asyncskipWhileInclusive
: same, but yield first, then test predicate on yielded valueskipWhileInclusiveAsync
: same, but asyncskipUntil
: skip until predicate becomes trueskipUntilAsync
: same, but asyncskipUntilInclusive
: same, but skip first, then test predicateskipUntilInclusiveAsync
:same, but asyncateWe should include these if we do the skipUntilXXX
functions:
takeUntil
: as long as predicate is false, keep skipping, then stoptakeUntilAsync
: same, but asynctakeUntilInclusive
: same, but yield first, then test predicate on yielded valuetakeUntilInclusiveAsync
: same, but asyncNot sure we should have both until
and while
, as they are logically each other's opposites.
Currently, it is not very clear how users can pass a CancellationToken
through. While the CE has support for cancellation tokens, and the token is passed on to GetAsyncEnumerator(ct)
, unless users access the interface directly, there is currently no way to pass a cancellation token.
There are four, not necessarily exclusive ways, to implement this.
TaskSeq.setCancellationToken
, which will return a taskSeq
with the given cancellation tokendo!
overload that allows writing taskSeq { do! myCancelToken }
in your CE, otherwise behaving the same as (1)taskSeq { cancellationToken myCancelToken }
in your CEtaskSeq
CE constructor. However, if possible, this may not be easily discoverableThere are upsides and downsides to each of these approaches. I think the first option, together with a getCancellationToken
should at least be supported.
However, there are other challenges as well. The helper functions in the TaskSeq
module all use the CancellationToken()
constructor (that is: no cancellation support). The taskSeq
builder in that respect is a bit of a mixed beast. Yes, you can pass in a cancellation token, but you have to do it manually, and then, using any of the helpers will basically ignore this token.
That's not a good position to be in. Probably, code like source.GetAsyncEnumerator(CancellationToken())
should become something like source.GetAsyncEnumerator(TaskSeq.getCancellationToken source)
. Which runs into another issue: while the state machine has a property for the cancellation token, any other implementation of IAsyncEnumerable<'T>
does not, which requires a type check to detect.
Adding all this magic comes at a cost. In AsyncSeq
and Async
this was resolved by adding overloads that take an optional cancellation token. Not ideal either.
Certain functions, like Debug.xxx
or the module TaskSeqInternals
are publicly visible (though most of the actual functions are not). Likewise, types like TaskSeqResumptionFunc
should probably be hidden.
Adding proper fsi
files (currently only for the TaskSeq
module), should do the trick.
We've removed the tail recursion because it was hard to consolidate it into yield!
(it was instead done with return!
, which has no place in taskSeq
).
However, today I helped someone with some code that he considered for taskSeq
which had the following approach:
let getPoliciesAsync policyid =
asyncSeq{
use connection = new NpgsqlConnection(npgsqlConnectionStringBuilder.ConnectionString)
use command = new NpgsqlCommand($"SELECT policy_data FROM policy.tbl_policy where policy_id = {Sql.uuid policyid};", connection)
let! reader = command.ExecuteReaderAsync() |> Async.AwaitTask
let rec someRec() = asyncSeq{
let! rowExists = reader.ReadAsync() |> Async.AwaitTask
if rowExists then
yield {| dt1 = reader.GetString(0) |}
yield! someRec()
}
yield! someRec()
} |> AsyncSeq.toAsyncEnum
As you can see, it uses asyncSeq
, but also: it is recursive. The same approach with taskSeq
would likely be more performant, however, if there are a lot of rows, this becomes problematic. This code can be rewritten with a loop, though.
@dsyme, sharing this with you in case we want to revisit this at some point.
Currently, this is not possible:
taskSeq {
do! Task.Delay 220 // fails, because this returns a Task, not a Task<unit>
}
Add necessary overloads for allowing this, similarly to how the task
CE works.
Of note is that allowing Task
can lead to unexpected behavior, as Task<'T>
inherits from Task
. However, we should strive to have parity with task
in F#. to avoid surprises for users. Just like with normal task
, we would still disallow do! task { return 10 }
. This should be OK, as the F# compiler does not (yet) allow that kind of conversion.
While most of the above have trivial semantics and should sometimes take an Async
overload (i.e., findIndexAsync
and initAsync
for the lambda), I'm not entirely sure of the usefulness of TaskSeq.cache
(and we already have TaskSeq.toSeqCached
), but perhaps for parity it should be included.
TaskSeq.cast
vs Seq.cast
While the docs on Seq.cast
say that it is meant for a "loosely typed sequence", to be cast to a generically strongly typed sequence, it can be used just fine to cast an F# seq
, because it also implements IEnumerable
(i.e., the non-generic variant).
// this is fine
let x: seq<uint> = seq { 1 } |> Seq.cast
But F# also allows you to perform an illegal cast:
// this won't give any warnings, but will throw an exception
let x: seq<Guid> = seq { 1 } |> Seq.cast
Since IAsyncEnumerable<'T>
only exists as a typed sequence, we should honor that and only allow valid casts. However, F# doesn't allow a constraint like 'T :> 'U
, the rh-side must be a concrete type. This means that in the end, it'll effectively work the same way as for Seq.cast
through boxing. Alternatively, if the type-relation is known ahead of time, users should probably just write TaskSeq.map (fun x -> x :> _)
just as they would for seq<_>
.
// this is fine
let x: seq<uint> = taskSeq { 1 } |> TaskSeq.cast
// this *should* raise a compile-time exception, but there's no way to enforce that
let x: seq<Guid> = taskSeq { 1 } |> TaskSeq.cast // incompatible
For comparison, Linq.Enumerable.Cast
works through the untyped Enumerable
as well.
EDIT: to get better parity with Seq.cast
and Linq.Enumerable.Cast
, I decided to only allow it to work with untyped task sequences (that is: IAsyncEnumerable<obj>
). I've updated the signature below. In addition, we'll be adding a box
and unbox
helper as well, the latter only for value types. If users want a reinterpret_cast
style cast, they can use TaskSeq.box >> TaskSeq.cast
. Due to the static way this is compiled, this won't add overhead.
length
, see #53lengthBy
, see #53lengthByAsync
, see #53allPairs
// later, requires cache
to be implementedindexed
, see #68cache
// later, see MBP approach of AsyncSeqcast
, see #67box
, see #67unbox
, see #67concat
, see #69findIndex
, see #68findIndexAsync
, see #68tryFindIndex
, see #68tryFindIndexAsync
, see #68contains
exists
existsAsync
init
, see #69initAsync
, see #69initInifinite
, see #69initInifiniteAsync
, see #69module TaskSeq =
val length: source: taskSeq<'T> -> Task<int>
val lengthBy: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<int>
val lengthByAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<int>
val allPairs: source1: taskSeq<'T> -> source2: taskSeq<'U> -> taskSeq<'T * 'U>
val indexed: source: taskSeq<'T> -> taskSeq<int * 'T>
val cache: source: taskSeq<'T> -> taskSeq<'T> // later, see MBP approach of AsyncSeq
val cast: source: taskSeq<obj> -> taskSeq<'U>
val box: source: taskSeq<'T> -> taskSeq<obj>
val unbox<'T when 'T: struct> : source: taskSeq<obj> -> taskSeq<'U>
val concat: source1: taskSeq<'T> -> source2: taskSeq<'T> -> taskSeq<'T>
val findIndex: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<int>
val findIndexAsync: predicate: ('T -> Task<bool>) -> source: taskSeq<'T> -> Task<int>
val tryFindIndex: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<int option>
val tryFindIndexAsync: predicate: ('T -> Task<bool>) -> source: taskSeq<'T> -> Task<int option>
val contains: value: 'T -> source: taskSeq<'T> -> Task<bool> (requires comparison)
val exists: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<bool>
val existsAsync: predicate: ('T -> Task<bool>) -> source: taskSeq<'T> -> Task<bool>
val init: count: int -> initializer: (int -> 'T) -> taskSeq<'T>
val initAsync: count: int -> initializer: (int -> Task<'T>) -> taskSeq<'T>
val initInfinite: initializer: (int -> 'T) -> taskSeq<'T>
val initInfiniteAsync: initializer: (int -> Task<'T>) -> taskSeq<'T>
This is probably not supposed to happen, thanks for reporting offline to me, @bartelink. The file release-notes.txt
is part of the package build, but should not show up as a dependency.
While writing tests for #31, I started pondering this โ๏ธ.
For comparison, F# throws an ArgumentException
for Array.zip
and List.zip
, but not for Seq.zip
. Most likely, this is because "getting the next item" can have an unwanted side effect, so instead, they choose to stop processing as soon as one of the sequences is exhausted.
Arguably, such side effect can still happen. I.e., for zip
we have to call MoveNextAsync
, which will give us a bool
that tells us whether we reached the end of the sequence. But, if the first sequence still has items, but the second sequence is exhausted, the side effect of the first sequence for "one item past the last" has already happened.
In fact, I think the Seq.zip
in F# should probably come with a warning. What if each item is "getting a web page of several megabytes"? AFAIK, there's no peek
function that would potentially tell me, without side effects, that a sequence is exhausted.
Edit: linking the discussion in the F# repo: dotnet/fsharp#14121
I've long felt icky about using Async.Ignore
. Seeing the Async.ignore
that's presently hiding in plain sight here makes me realise the absence of a Task.ignore
and ValueTask.ignore
from FSharp.Core is something I've also been grinning and bearing (sprinkling :> Task
and all sorts of other such mindless hacks). Utils.fs
presently has helpers that feel very tempting to use more broadly. However, binding to a set of support helpers on the fringe of this library's core feature set is obviously debatable.
(The other pair from here that I feel deserves a canonical implementation in a library that doesn't include a jungle of other less-related helpers is Async.ofTask
/Task.toAsync
, but with corrected AggregateException
semantics, and honoring of Async.Cancellation
when compared to Async.AwaitTask
as per FSharp.Core
. Will likely calve off a separate issue from this if placeholder FSharp.Core.TaskShims
package/repo is deemed to be the best home for a set of ignore
helpers)
related:
Please add the following from the wish list of the front page:
TODO list:
skip
, take
(#209)truncate
and drop
(as with Seq.truncate
, these are the counterparts of take
and skip
that do not throw) (see #209)skipWhile
, skipWhileAsync
, skipWhileInclusive
, skipWhileInclusiveAsync
(#219)
where
and whereAsync
(to mimic Seq
, but this is just an alias for filter
) (#217)takeWhile
etc, see #122 / #126, already implemented)
insertAt
, removeAt
and updateAt
, plus insertManyAt
, removeManyAt
updateManyAt
forall
and forallAsync
(#240)max
, maxBy
, min
, minBy
, plus maxByAsync
and minByAsync
(#221)concat
overloads (for concatenating a taskseq of sequences/lists etc) (#237)Full list of signatures:
val take: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
val skip: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
val drop: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
val truncate: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
val skipWhile: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val skipWhileAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>
val skipWhileInclusive: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val skipWhileInclusiveAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>
val where: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val whereAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>
// these four already implemented:
val takeWhile: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val takeWhileAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>
val takeWhileInclusive: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val takeWhileInclusiveAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>
val insertAt: position:int -> value:'T -> source: TaskSeq<'T> -> TaskSeq<'T>
val removeAt: position:int -> source: TaskSeq<'T> -> TaskSeq<'T>
val updateAt: position:int -> value:'T -> source: TaskSeq<'T> -> TaskSeq<'T>
val insertManyAt: position:int -> values:TaskSeq<'T> -> source: TaskSeq<'T> -> TaskSeq<'T>
val removeManyAt: position:int -> count:int -> source: TaskSeq<'T> -> TaskSeq<'T>
val forall: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<bool>
val forallAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> Task<bool>
val max: source:TaskSeq<'T> -> Task<'T> (requires comparison)
val min: source:TaskSeq<'T> -> Task<'T> (requires comparison)
val maxBy: projection: ('T -> 'U) -> source:TaskSeq<'T> -> Task<'T> (requires comparison)
val minBy: projection: ('T -> 'U) -> source:TaskSeq<'T> -> Task<'T> (requires comparison)
val maxByAsync: projection: ('T -> #Task<'U>) -> source:TaskSeq<'T> -> Task<'T> (requires comparison)
val minByAsync: projection: ('T -> #Task<'U>) -> source:TaskSeq<'T> -> Task<'T> (requires comparison)
// overloads for existing concat (which takes nested task sequences)
val concat: sources: TaskSeq<'T seq> -> TaskSeq<'T>
val concat: sources: TaskSeq<'T list> -> TaskSeq<'T>
val concat: sources: TaskSeq<'T array> -> TaskSeq<'T>
val concat: sources: TaskSeq<ResizeArray<'T>> -> TaskSeq<'T>
Attempt at fixing and more analysis: #27.
While working on PR #23, it turned out that the test runner never finished, leading to empty logs (while running or after canceling). The bad behavior of the Github reporting is explained here: actions/runner#1326 (and it was closed with no fix).
The cause appears to be a (potential) race condition. It is yet unclear whether that is caused by the resumable code or something else. The temporary solution in #23 is to disable parallel runs in CI and to improve reporting.
Unfortunately, xUnit reporting is quite useless for cases like this. It only reports when a test has finished, not when it started. While dotnet run
has a --blame-hang-timeout
, which at least solves the issue with GH action reporting, as at the very least, this results in a failed test run.
Opening this issue to investigate further, so that we can close #23 meanwhile, as the implementations there have nothing to do with this behavior (removing enough tests removes the issue, but it doesn't matter which ones you remove, ultimately, the race condition comes back).
Example of a run that went on for over 5 hours (!): https://github.com/abelbraaksma/TaskSeq/actions/runs/3250819154/jobs/5334999458 (note: after retention period is over, this log will be gone).
Running ``CE taskSeq with nested deeply yield!``
(permalink, goes to original from the time of writing this) takes forever, but based on the non-delaying performance, it should be "just fine" and take an average time.
This test creates nested IAsyncEnumerable<'T>
, themselves based on unit -> task<'T>
functions to prevent having hot-started task
s. However, the test takes over 30s to finish (with the commented code enabled). Note that all of these tests deliberately use a randomly generated delay though Task.Delay
.
Similar tests with many tasks that are not delayed do not have an issue.
It appears that Dispose()
or DisposeAsync()
is not being called when use!
is used. First mentioned offline by @bartelink. Logging here to link the PR to a tracking issue and for the changelog/release notes.
Turned out that this was inadvertently commented out in 27486f3, which shouldn't have happened. In a follow up I will create some tests to ensure this won't happen again.
Signatures should be similar like for Seq
, with overloads for async
versions so that the picker
or chooser
function can return a Task
.
Consider:
TaskSeq.isEmpty
TaskSeq.head
TaskSeq.tryHead
TaskSeq.last
TaskSeq.tryLast
TaskSeq.tryExactlyOne
TaskSeq.exactlyOne
TaskSeq.item
TaskSeq.tryItem
TaskSeq.find + async version
TaskSeq.tryFind + async version
TaskSeq.pick + async version
TaskSeq.tryPick + async version
TaskSeq.filter + async version
TaskSeq.choose + async version
As with Seq
, the non-try variants will raise an exception if the precondition is not met.
Hi.
Thanks for working on this. I have been exploring this repository since I wanted to do something similar, and I noticed that even when manually passing in a cancellation token, it is not directly acted upon. So for example
let consumeManually (enumerable: IAsyncEnumerable<int>) (token: CancellationToken) = task {
do! Task.Yield()
let mutable hasRead = false
use enumerator = enumerable.GetAsyncEnumerator(token)
let! canRead = enumerator.MoveNextAsync()
hasRead <- canRead
while hasRead do
let _here = enumerator.Current
let! canRead = enumerator.MoveNextAsync()
hasRead <- canRead
return ()
}
let infinite () = taskSeq {
while true do
yield 1
}
[<Fact>]
let usesCancellationToken () = task {
use src = new CancellationTokenSource()
let readToEnd = consumeManually (infinite()) src.Token
do! Task.Delay(100)
src.Cancel()
do! readToEnd
}
will run forever. However, if something like Async.Sleep()
is bound inside the taskSeq, it will end with an exception, since the token is actually passed to the async when it is bound.
Do you think it would make sense to set the promiseOfValueOrEnd
to a cancelled exception (which seems to be the ideomatic dotnet way) or complete(false) when the token is cancelled? I.e, using the CancellationTokenRegistration to abort MoveNextAsync on cancellation?
I'd like to see takeWhileInclusive
(it has uses in Equinox) be ported from AsyncSeq.
Will post a PR to add it to the TODO list (and then one to implement it) in due course unless someone beats me to it, or there are objections to adding it...
Can we implement this function? To prevent having a dependency on AsyncSeq
, we probably need to create this function using SRTP, but it should be possible for users to convert to/from AsyncSeq
.
Within the F# ecosystem, there are a number of commonly used augmentations that typically get maintained as internal helpers within libraries and applications, manually copied around in various ways. Examples of such helpers:
While it can be argued that many of these concerns should be addressed in FSharp.Core
, these are being logged here first in an attempt to narrow down the number of suggestions, with a view to getting stable naming (and potentially a shims library of some kind as a stopgap). NOTE that building a one stop shop for lots of arbitrary helpers is a non-goal of this effort.
NOTES:
FSharp.Core
When you call .WithCancellation(cancellationToken)
on IAsyncEnumerable<'T>
you get System.Runtime.CompilerServices. ConfiguredCancelableAsyncEnumerable<'T>
which is not currently supported.
So that this code does not work
static member ToFlatListAsync<'Source>(source: IQueryable<'Source>, [<Optional>] cancellationToken: CancellationToken) = task {
let builder = ImmutableArray.CreateBuilder<'Source>()
do!
source.AsAsyncEnumerable().WithCancellation(cancellationToken)
|> TaskSeq.iterAsync (fun x -> builder.Add(x))
return builder.ToImmutable();
}
To support the same types F# Core supports with task
, specifically in let!
and do!
, we'll need to implement the ^TaskLike
SRTP approach. This prevents ambiguities caused by Task<'T> :> Task
. It also limits the needed overloads for ValueTask<'T>
and ValueTask
.
Related to #43, which can only be implemented by using this approach.
This change will unify the types supported between task
and taskSeq
, except for async
, which should be added next (see #79).
As in the title. Any of these errors only happen when multiple iterations are attempted and the source is a taskSeq
CE, not when the source is a user-defined or library defined IAsyncEnumerator<_>
, used with the TaskSeq
library functions.
I already started investigating this issue and it has to do with properly resetting state when "reaching the end" and when "getting an enumerator over the same resource". See #36 continued: #42.
MoveNextAsync()
// this throws: InvalidOperationException: Operation is not valid due to the current state of the object.
task {
let tskSeq = taskSeq { yield 1; yield 2 }
let enum = tskSeq.GetAsyncEnumerator()
let! isNext = enum.MoveNextAsync() // true
let! isNext = enum.MoveNextAsync() // true
let! isNext = enum.MoveNextAsync() // false
let! isNext = enum.MoveNextAsync() // error here
()
}
GetAsyncEnumerator()
with MoveNextAsync()
// throws: InvalidOperationException: Operation is not valid due to the current state of the object.
task {
let tskSeq = getEmptyVariant variant
use enumerator = tskSeq.GetAsyncEnumerator()
let! isNext = enumerator.MoveNextAsync()
use enumerator = tskSeq.GetAsyncEnumerator()
let! isNext = enumerator.MoveNextAsync() // throws here
()
}
// this throws:
// InvalidOperationException:
// An attempt was made to transition a task to a final state when it had already completed.
task {
let tskSeq = taskSeq { yield 1; yield 2 }
let ts1 = tskSeq |> TaskSeq.map (fun i -> i + 1)
let result1 = TaskSeq.toArray ts1
let ts2 = ts1 |> TaskSeq.map (fun i -> i + 1)
let result2 = TaskSeq.toArray ts2 // error here
()
}
Consider this:
task {
let mutable i = 0
let ts = taskSeq {
i <- i + 1
yield 1
i <- i + 1
yield 2
i <- i + 1 // we should never get here, if we pick idx 1
}
do! ts |> TaskSeq.item 1
printfn "%i" i
}
This prints "3", not "2". That's wrong. Side effects beyond the item found should not execute.
In rare scenarios, for instance, where the resumable code is invoked in a top-level function, the compiler may not be able to compile the resumable code statically. For these cases, we need a dynamic version of the resumable code (which exists, but currently simply raises an exception).
This issues serves as a placeholder for this.
Replaces #129. TaskEx top level issue: #139
The default implementation of the Async.AwaitTask
methods in FSharp.Core
have some key shortcomings:
Task
faults, yielding an exception, that exception is typically (always?) wrapped in an egregious AggregateException
CancellationToken
of the async
expr within which Async.AwaitTask
is triggeredTaskCancelledException
to align with the behavior of Task
While it can be argued that the current behavior is 'wrong', it's also obvious that breaking it would be untenable, and the semantic differences are beyond what one might cover with subtle overloads and/or adding optional arguments etc.
Current proposed APIs (will be updated inline based on any discussion below):
module Async =
let inline ofTask (t : Task<'t>) : Async<'t> = AwaitTaskCorrect t
let inline ofUnitTask (t : Task) : Async<'t> = AwaitTaskCorrect t
module Task =
let inline toAsync (t : Task<'t>) : Async<'t> = Async.ofTask t
NOTES:
TaskSeq
, which are exposed in the FSharp.Control
namespace. NOTE the current implementations use Async.AwaitTask
, but that was not as a conscious choice, and there is a desire` to fix at least some of the shortcomings notedtask
and async
are now first class citizens of FSharp.Core
, having an of/to
pairing would seem to make sense. This is open to debate; not sure the degree to which the prior art is consistent wrt this beyond collection types/modulesfslang-suggestion
regarding this. The purpose of this issue is as a placeholder for potentially filling the gap until this issue can be more thoroughly resolved in FSharp.Core
proper.module
, one also frequently needs an ofUnitTask
alongside as above (Async.AwaitTask is a pair of overloaded methods, which often brings its own issues with intellisense and error messages etc)I see chunkBySize is supposed to be added. Is there any guidance or reference for patching this in myself? I've come across a use for it in a pipeline.
Thanks
This post has instructions: https://dev.to/j_sakamoto/writing-a-nuget-package-release-notes-in-an-outside-of-a-csproj-file-3f94.
Using that approach allows a bit more control over release notes and, more importantly, doesn't require to update fsproj
each time we issue a new version, which in itself requires updating the release notes.
Just opening as an issue so I won't forget doing this ;).
It would be useful to add an extension method For
on TaskBuilder
to be able to do for
loops over IAsyncEnumerable
in tasks.
let mySeq =
taskSeq {
for i in 0..9 do yield i
}
let myTask =
task {
for x in mySeq do // <-- This is currently not possible.
printfn $"{x}"
}
Replaces #129. TaskShims top level issue: #139
In contrast to its sibling, Async.StartAsTask
(because it is not starting a thread), Async.StartImmediateAsTask
does not have a taskCreationOptions
optional parameter, so people often use it pipeline expressions.
However, that makes it easy to gloss over the fact that the computation will then not have an ambient cancellation token.
In order to resolve those forces (wanting to be able to execute an async
via piping, without the risk of omitting to consider cancelation), it is proposed to have a common helper fun
ction (with a lower case name) that
Async.StartImmediateAsTask
Current proposed APIs (will be updated inline based on any discussion below):
module Async =
let inline startImmediateAsTask ct (a : Async<'t>) = Async.StartImmediateAsTask(a, cancellationToken = ct)
// ALTERNATELY
let inline executeAsTask ct (a : Async<'t>) = Async.StartImmediateAsTask(a, cancellationToken = ct)
NOTES:
I do notice the name "TaskSeq" is confusing some people - people thinking this is about "sequences of tasks". I'm not sure what to do about this.
While thinking about this I included some general notes on naming in this space, see below
F# IEnumerator
HotSynchronousFastEnumerator
IEnumerable<T>
F# IEnumerable = Seq
ColdSynchronousFastEnumerable
unit -> IEnumerable<T>
.NET/F#/C# Task = C# async/await
HotAsynchronousFastValue
Task<T>
ColdAsynchronousFastValueFactory
unit -> Task<T>
ColdAsynchronousFastCancellableValueFactory
CancellationToken -> Task<T>
Async = F# async
ColdAsynchronousCancellableValueFactory
CancellationToken -> Task<T>
Current F# AsyncSeq
ColdAsynchronousCancellableEnumerable
CancellationToken -> IAsyncEnumerator<T>
Current F# TaskSeq
ColdAsynchronousHalfCancellableEnumerable
CancellationToken -> IAsyncEnumerator<T>
I'm leaving the question of tailcalls off the list, as much as I'd like to address that.
It's worth noting that at a high level there's no real logical difference between CancellableTask
and F# Async<_>
. Nor between F# TaskSeq
and F# AsyncSeq
.
The sweet spot for F# is really Cold+RunMany+Asynchronous+Fast+Cancellable+Tailcalls, which is what TaskSeq is close to being technically (except tailcalls, sadly).
At present the current impl of for
within async {
expressions raises two concerns for me:
AwaitTaskCorrect
semantics would be preferable to promulgating usage of Async.AwaitTask
in a place that most people will not necessarily even infer that it's in play.Async.StartAsTask
includes an unnecessary transition to the Thread Pool (with a potential context switch?) which could instead safely be replaced with Async.StartImmediateAsTask
in this context. EDIT: moved to separate thread #135As in the title, we see:
Warning NU1608: Detected package version outside of dependency constraint: FsUnit.xUnit 5.5.0 requires xunit (>= 2.5.3 && < 2.6.0) but version xunit 2.6.1 was resolved.
Reported here: fsprojects/FsUnit#253
Replaces #128. TaskEx top level issue: #139
Async.Ignore
has always been ugly and undiscoverable. While I tend to make ignoring explicit by using let! _ = <async stuff I want to ignore result of>
, it's commonly the last expression in a function, and having to do let! _ = <thing I'm wrapping> in ()
is too much.
It is proposed that the module
associated with any given builder should by convention have an ignore
function that correctly observes completion of the work, dropping the result, but propagating exceptions, if any
Current proposed APIs that are not currently in FSharp.Core
(will be updated inline based on discussion below):
module Async =
let inline ignore (a : Async<'t>) = Async.Ignore
module Task =
let inline ignore (t : Task<'t>) = ...
module ValueTask =
let inline ignore (t : ValueTask<'t>) = ...
NOTES:
TaskSeq
(and other libs such as IcedTasks contain various bespoke implementationsFSharp.Core
is the obvious home for them. (However that would raise the issue of whether they need to go into the 6.x release line in order to align with minimal dependencies for various common libraries)Great to see this taking shape. Was able to port Equinox to use it in one impl without issues. Am intending to spread that to the other integrations in due course.
However, it does force a dep on FSharp.Core
v 6.0.2
- is there an upstream requirement that triggers this ? Is this likely to remain stable?
(Reason I ask is that the Equinox deps are 6.0.0 and I was trying to hold the line on that. Should I up the requirement, Equinox will likely shift to depending on 6.0.6, as there's an ugly shim required for a StackOverflow in Async.Parallel that's not fixed until then)
In PR #114 (adding support for Async
in Bind
), @bartelink raised the question whether it would be better to use StartImmediateAsTask
instead of StartAsTask
.
Specifically, it is about this part:
let mutable awaiter =
Async
.StartAsTask(asyncSource, cancellationToken = sm.Data.cancellationToken)
.GetAwaiter()
The current approach was taken to mimic the same approach taken for task
in F# Core: https://github.com/dotnet/fsharp/blob/d91b6c5c97accf363d135d1f99816410da4ec341/src/FSharp.Core/tasks.fs#L462, basically:
member inline this.Bind(computation: Async<'TResult1>, continuation: ('TResult1 -> TaskCode<'TOverall, 'TResult2>)) : TaskCode<'TOverall, 'TResult2> =
this.Bind(Async.StartAsTask computation, continuation)
For comparison, see what was done in async2
, which was poised to be the new async
based on resumable code, like task
is, but unlike task cold-started like the current async
.
So, we have two scenarios:
StartAsTask
to get similar semantics as the task
CE, but this causes a thread hopStartImmediateAsTask
, which uses the current contextI'm not aware of any downside of using the latter. It definitely saves a thread switch in the general case (note that the thread pool may not always cause a physical thread switch).
It is currently quite a pain to implement cancellation passing explicitly. As has been said in several issues already, we need to support this on each member to do it properly. Plus we will need overloads:
TaskSeq
should also seamlessly take a ConfiguredCancelableAsyncEnumerable<_>
, which is available in .NET Standard 2.1. This would solve #167, reported by @xperiandri (PS: to prevent the surface area to blow up, I will check if ofCancelable/toCancelable
functions would serve the same purpose).Async
prefix should be able to take a Task
(as they can, currently), an Async
and a ValueTask
. Writing functions like iterAsync
, iterWithTask
, iterWithValueTask
etc is going to be too painful long-run.A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.