Code Monkey home page Code Monkey logo

fsharp.control.asyncseq's Introduction

FSharp.Control.AsyncSeq NuGet Status

FSharp.Control.AsyncSeq is a collection of asynchronous programming utilities for F#.

See the home page for details. The home page can be edited, forked or cloned Please contribute to this project. Don't ask for permission, just fork the repository and send pull requests.

Maintainer(s)

The default maintainer account for projects under "fsprojects" is @fsprojectsgit - F# Community Project Incubation Space (repo management)

fsharp.control.asyncseq's People

Contributors

beauvankirk avatar chrsteinert avatar cjliberty avatar deneuxj avatar dependabot[bot] avatar dnauck avatar dsyme avatar eiriktsarpalis avatar enricosada avatar eulerfx avatar forki avatar fsprojectsgit avatar gusty avatar johlrich avatar levibotelho avatar mattjohnsonpint avatar mattstermiller avatar mavnn avatar mlaily avatar njlr avatar panesofglass avatar rhwy avatar sergey-tihon avatar shmew avatar tachyus-ryan avatar thednaz avatar tpetricek avatar vchekan avatar wilsoncg avatar xperiandri avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fsharp.control.asyncseq's Issues

GitHub link in documentation points to wrong line number

Description

In the documentation for map, the link to the GitHub source is incorrect. It points to line 950. Instead, it should point to line 956.

Repro steps

Please provide the steps required to reproduce the problem

  1. Visit https://fsprojects.github.io/FSharp.Control.AsyncSeq/reference/fsharp-control-asyncseq.html
  2. Scroll down to the documentation for map
  3. Click on the link to the GitHub source code

Expected behavior

The link should have a line anchor to map in the source code

Actual behavior

The link has a line anchor to a few lines above map

Known workarounds

N/A

Related information

N/A

Rename toList/toArray to toListSynchronously/toArraySynchronously

Calling Async.RunSynchronously in libraries is error-prone and can cause dead-locks. (e.g. when running on the main interactive thread.) I would just remove the two functions toList and toArray altogether to prevent library users from hitting hard-to-debug issues.

let toList (source:AsyncSeq<'T>) = toListAsync source |> Async.RunSynchronously

let toArray (source:AsyncSeq<'T>) = toArrayAsync source |> Async.RunSynchronously

AsyncSeq.toBlockingSeq doesn't cancel the task

AsyncSeq.toBlockingSeq doesn't cancel the underlying task.

Also, it starts the task immediately in the application of the combinator. This isn't right - the task should be started each time the sequence is iterated - we wouldn't expect the application of the combinator should have an immediate side-effect.

StrongNaming for nuget package

Description

The latest nuget package (v2.0.21) contains the FSharp.Control.AsyncSeq.dll which is not strongly named. Having a project that is strongly named that depends on this library will fail, except if a third party (such as StrongNamer) is used.

Repro steps

Create a project that is signed, have this project depend on this library.
Execute the project. It will fail runtime.

Expected behavior

Signed assemblies should be able to run this library without third party StrongNamer.

Actual behavior

Runtime will fail if a signed project tries to load the library.

Known workarounds

A third party such as StrongNamer is needed on strongnamed projects that depend on this library.

Related information

n\a

AsyncSeq.cache needs work

The implementation of AsyncSeq.cache seems to create a chain of agents (a recursive call to cache). At a first glance I don't think this is needed, or at least it is not going to be performant - surely we should write results into a shared concurrent data structure.

Indeed agents seemed over used in the implementation - combinators that use an appropriate shared memory concurrent data structure seem better

Resources, AsyncSeq, Try Finally and possible alternative definitions

In general we expect sensible behaviour for sub-sequences of AsyncSeq objects (those returned by tail, skip etc.)

When I look at the definition of something like ofObservableUsingAgent I see an AsyncSeq which has internal resources - an agent - and, if you replayed the_tail_ of the sequence multiple times, or if you didn't iterate all the way through the sequence, then you'd get some odd behaviour - each time you're replying it you're communicating with the agent and presumably getting different replies. The agent allocated at the start represents shared state, shared between all the various sub-sequences returned. Also, the agent only gets collected if the sequence gets iterated all the way to the end.

let internal ofObservableUsingAgent (input : System.IObservable<_>) f =
asyncSeq {
use agent = AutoCancelAgent.Start(f)
...
yield! loop() }

All this stems from the fact that the definition of AsyncSeq is currently much like a lazy list, just asynchronous (indeed it's name could almost be AsyncList). The "tail" operation drops the head, and a "cons" would put a different head on etc. Attempts to use shared mutable resources in LazyList-like combinators usually results them being shared in all "tail" instances, with odd results.

Basically, LazyList-like structures can't hold shared-mutable or disposable resources during iteration. That's one advantage of IEnumerable<'T> over LazyList<'T>, because IEnumerable separates iteration from data, and iterators can hold state.

To give another example, the current "algebraic" definition is problematic w.r.t. the behaviour of "use" . As things stand today, when you use "use" in an asyncSeq { ... }

(a) The compensation functions are only executed if the consuming iteration completes to the end of the sequence - and not if an early exit occurs. (In contrast, IEnumerator objects can be disposed at any point during the iteration.)

(b) The compensation functions will be executed every time the "tail" of an async seq is iterated to its end. (In contrast, a separate IEnumerator object is created for each iteration.)

Basically, it's probably not a good idea to try have both an algebraic definition of AsyncSeq and support using/try-finally.

All these problems leads to the possible alternative definition of AsyncSeq as an async version of iterators:

  type IAsyncEnumerator<'T> = 
      abstract MoveNext : Async<bool>
      abstract Current : 'T
      abstract Dispose : unit -> unit  (* or Async<unit> *)

  type IAsyncEnumerable<'T> = 
      abstract GetEnumerator : unit -> IAsyncEnumerator<'T>

  type AsyncSeq<'T> = IAsyncEnumerable<'T>

I actually think we need to do a comparison between the current "algebraic" definition of AsyncSeq and alternative "iterator" definitions. The iterator definition is in many ways the "natural" definition of asynchronous sequences for F#, at least if you take "sequence" = "iterable".

In particular, the "iterator" definition allows a sensible notion of "Dispose" on the iterators, which allows them to have resource - very similarly to the case above.

Note that APIs can often have problems like this without them being observed as a significant problem in practice because in most cases AsyncSeq objects are one-shot - they are only ever iterated once, or iterations of the outermost object are independent as for Seq.

Cheers
Don

3.x fails with fable

Description

When I use the library with fable I am getting the below error:
Module build failed (from ./node_modules/fable-loader/index.js): Error: Could not find file '/home/onur/projects/infill2/.fable/FSharp.Control.AsyncSeq.3.0.1/AsyncSeq.fsi'. at /home/onur/projects/infill2/node_modules/fable-loader/index.js:98:22

The downgrading to 2.x solves the issue.

Simple aggregation

Hello Lev,

I was trying to perform some simple aggregation using AsyncSeq, and seems like I am facing an issue. My code first:

` let inputStream = asyncSeq {
do! Async.Sleep(1000)
yield 1
do! Async.Sleep(1000)
yield 2
do! Async.Sleep(1000)
yield 3
do! Async.Sleep(1000)
yield 4
do! Async.Sleep(1000)
yield 5
}

let res = 
    inputStream
    |> AsyncSeq.groupBy(fun x -> x % 3)
    |> AsyncSeq.mapAsync( snd >> AsyncSeq.lastOrDefault 0)

res
|> AsyncSeq.iter(fun x -> printfn "--> %A" x) 
|> Async.Start`

I am trying to group the data and aggregate it using the last value in each of the groups. e.g. daily close price from intraday bars. I am using lastOrDefault function for this. The code above doesn't produce the expected agg values, and I think there should be a simpler way to do this. Can you please advise?

Best regards,
Pavel

AsyncSeq.groupBy produces sequence which behaves like "cold" even if source is "hot"

Description

I use AsyncSeq.groupBy and inside the group, when group is created, I get first message of sequence (AsyncSeq.tryFirst)to initialize business logic ( I am merging 2 kafka streams and offsets for stream2 are in stream1). Surprisingly, when I get first message and continue, I am getting "first" message again. In the code below, it fails in groupBy test with message

   Expected: not equal to <Some(1)>
  But was:  <Some(1)>

Repro steps

dotnet test --filter "AsyncSeq.groupBy should not restart"

[<Test>]    
let ``AsyncSeq.groupBy should not restart sequence but continue``() =
  //
  // "cold source will repeat 1st element: this is expected"
  //
  let coldSrc: AsyncSeq<int> = 
    asyncSeq {
      yield 1
      yield 2
      yield 3
      yield 4
    }
  async {
    let! first = coldSrc |> AsyncSeq.tryFirst 
    let! second = coldSrc |> AsyncSeq.tryFirst 
    Assert.AreEqual(Some(1), first)
    // Seq is restarted, so we get "1" again
    Assert.AreEqual(Some(1), second)
  } |> Async.RunSynchronously


  //
  // Implement "hot" source
  //
  let hotSource (src: AsyncSeq<int>): AsyncSeq<int> =
    use enum = src.GetEnumerator()
    let rec loop(): AsyncSeq<int> = asyncSeq {
      let! next = enum.MoveNext()
      match next with
        | Some next' ->
          yield next'
          yield! loop()
        | None -> ()
    }
    loop()

  let hotSrc: AsyncSeq<int> = 
    asyncSeq {
      yield 1
      yield 2
      yield 3
      yield 4
    }
    |> hotSource

  async {
    let! first = hotSrc |> AsyncSeq.tryFirst 
    let! second = hotSrc |> AsyncSeq.tryFirst 
    Assert.AreEqual(Some(1), first)
    // With "hot" source, we get 2nd element
    Assert.AreEqual(Some(2), second)
  } |> Async.RunSynchronously

  //
  // Test behaviour of sequence inside groupBy
  //
  let groupBySource: AsyncSeq<int> = 
    asyncSeq {
      yield 1
      yield 2
      yield 3
      yield 4
    }
    |> hotSource

  let res = 
    groupBySource
    |> AsyncSeq.groupBy(fun s' -> s' % 2)
    |> AsyncSeq.mapAsyncParallel(fun (_group, subseq) -> async {
      let! first = 
        subseq
        |> AsyncSeq.tryFirst
      let! second =       
        subseq
        |> AsyncSeq.tryFirst
      Assert.AreNotEqual(first, second)
      return! subseq |> AsyncSeq.toListAsync
    }

    )
    |> AsyncSeq.toList
    |> List.sortBy(fun a -> a.[0])
  
  let expect = 
    [
      [1; 3]
      [2; 4]
    ]

  Assert.AreEqual((expect |> sprintf "%A"), (res |> sprintf "%A"))

Expected behavior

Is source sequence is "hot" I would expect groupBy to not change it.

Related information

  • Operating system: win10
  • Branch: master
  • .NET Runtime, CoreCLR or Mono Version: dotnet --version 2.1.201

nuget publish

@eulerfx / @dsyme
How to bump the nuget package version?
I'd also like to re-publish the API documentation at the same time (as now the links to line numbers in master are out of whack because of commits).

Request new function

Hi!

In my work I sometimes need a function that takes asyncs and perform them in parallel, but with simultaneously performed asyncs count lesser than count of overall asyncs passed to function, and return results of asyncs as AsyncSeq in order to consumer can process the results as soon as they appear.

Maybe my explanation confusing, I will try express it in the code:

let getJobs () = asyncSeq {
    // some jobs retrieve code here
    return [1;2;3;4] |> AsyncSeq.ofSeq
}

let download job = async {
    // some download code here
    return 42
}

let downloadAll parallelCount = asyncSeq {
    let jobs = getJobs() // overall jobs count far greater than parallelCount
    let results = jobs |> AsyncSeq.requestedFunc parallelCount
    return results
}

// consumer code
let maxParallel = 5
downloadAll maxParallel
|> AsyncSeq.toBlockingSeq
|> Seq.iter (fun result -> printfn "%A" result)

Here is my attempt to do so:

let downloadAllBad parallelCount = asyncSeq {
    let jobs = getJobs()
    let batches = jobs |> AsyncSeq.bufferByCount parallelCount

    for batch in batches do
        let! results =
            batch
            |> Seq.map download
            |> Async.Parallel       // bad: wait time of results = wait time of longer async in batch
        for result in results do
            yield result
}

There I forced to wait results of all asyncs in batch, but I want to recieve results as them appears.

In past I use MailboxProcessor's for this, one agent distrubutes tasks to a few worker agents.

the `return` of `asyncSeq` builder is non intuitive

The return of the asyncSeq computation ignore the value passed.
I found it bug prone when upgrading a library from asyncseq v1 to v2 types.

let a = asyncSeq {
    yield 1
    yield 2
    return 3
}
  • no errors
  • the a is AsyncSeq<int>
  • as list, it's [1; 2]
  • the 3 is ignored, no warning, no errors

Same when trying to return another asyncSeq like

let b = asyncSeq {
   yield 4
   return a
}

i think that should be return! a to work correctly (not yet supported)

workaround

always do yield and yield!

let a = asyncSeq {
    yield 1
    yield 2
    yield 3
}

let b = asyncSeq {
   yield 4
   yield! b
}

Can a new release with asyncseq support be published?

Description

With the release of .net core 3.0 and C# 8, AsyncEnumerables are becoming more prevalent. The AsyncSeq support for them has not yet been released and I see questions about it fairly-often on the FSSF slack. Could a new version of this library be published with those additions?

AsyncSeq.cache fails with SynchronizationLockException

The exception SynchronizationLockException is thrown when using AsyncSeq.cache

Build info:

  • .NET45.
  • FSharp.ControlAsyncSeq v2.0.1
  • FSharp.Core 4.3.0.0

Please note that the issue reproduces with v2.0.0 but it doesn't reproduce with v1.15.0

Exception details:

System.Threading.SynchronizationLockException was unhandled by user code
  HResult=-2146233064
  Message=Object synchronization method was called from an unsynchronized block of code.
  Source=FSharp.Control.AsyncSeq
  StackTrace:
       at [email protected](Unit unitVar) in C:\GitHub\dsyme\FSharp.Control.AsyncSeq\src\FSharp.Control.AsyncSeq\AsyncSeq.fs:line 817
       at FSharp.Control.AsyncSeq.tryFinally@326-1.System-IDisposable-Dispose() in C:\GitHub\dsyme\FSharp.Control.AsyncSeq\src\FSharp.Control.AsyncSeq\AsyncSeq.fs:line 351
       at [email protected](FSharpOption`1 _arg1) in C:\GitHub\dsyme\FSharp.Control.AsyncSeq\src\FSharp.Control.AsyncSeq\AsyncSeq.fs:line 340
       at [email protected](a a)
  InnerException: 

This question was also raised on stack overflow

The sample code is listed below.

open FSharp.Control

[<EntryPoint>]
let main argv = 
    let asyncOp x =
        async {
            do! Async.Sleep x
            return x
        }

    let chooser x = if x > 1000 then Some x else None

    let execute (input : AsyncSeq<int>) =
        async {
            printfn "First result... Expect 3s sleep"
            let! r1 = AsyncSeq.tryPick chooser input
            printfn "Computing second result. Expect no sleep"
            let! r2 = AsyncSeq.tryPick chooser input
            printfn "Done"
            return (r1,r2)
        }

    printfn "Starting..."

    let v =
        [1000;2000;3000]
        |> AsyncSeq.ofSeq
        |> AsyncSeq.mapAsync asyncOp
        |> AsyncSeq.cache
        |> execute
        |> Async.RunSynchronously


    printfn "results: = %A" v
    do System.Console.ReadKey() |> ignore
    0

IAsyncEnumerator<Map<string,int>>' is not a valid enumerator type

tables if of type: AsyncSeq<Map<string,int>>

A simple bind
let x = tables |> AsyncSeq.take 200
gets the following error

The type IAsyncEnumerator<Map<string,int>> is not a valid enumerator type , i.e. does not have a 'MoveNext()' method returning a bool, and a 'Current' property

Parallel sequence runs consequentially (non-parallel)

Description

I've tried to implement some of my work with asyncSeq.
I have a big job list which I want to process in parallel with some throttling (32 threads maximum for example) and then feed results as stream to other async tasks (saving to disk etc).
Async.Parallel is not good for me because it doesn't have max thread setting
I've also tried slicing job list myself but it was very memory inefficient so that's the moment when I switched to AsyncSeq

Repro steps

Just run the code below


#r @"../../packages/FSharp.Control.AsyncSeq/lib/net45/FSharp.Control.AsyncSeq.dll"

open FSharp.Control

//simulating long IO work
let longWork x = async {    
    do! Async.Sleep 3000
    return x * 2
}

//simulating another async task, saving to HDD for example
let log x = async {
    let! x = x    
    printfn "%A" x
    return x
}

let doWorkAndSaveInParallel() =
    async {
        do! seq {1..100}                                          //original data
            |> AsyncSeq.ofSeq
            |> AsyncSeq.mapAsync (longWork >> log)                //my async workflow
            |> AsyncSeq.indexed
            |> AsyncSeq.groupBy (fun (i, _) -> i % 32L)           //slicing by 32 threads
            |> AsyncSeq.mapAsyncParallel 
                (snd >> AsyncSeq.map snd >> AsyncSeq.iter ignore) //parallel
            |> AsyncSeq.iter ignore                               //iteration
    } |> Async.RunSynchronously

Expected behavior

Expected to see numbers 2,4,6...64 after 3 seconds almost simultaneously (in any order), after 3 more seconds there should be numbers 66,68,70..128 etc...

Actual behavior

Number appears one by one with 3 sec interval (no parallel at all)

Known workarounds

Do parallelism myself is only known workaround

Related information

Win10
FSharp.Control.AsyncSeq (2.0.16)

Memory leak in AsyncSeq.bufferByCountAndTime

Leave this running for a few minutes and you'll get an OutOfMemoryException.

open FSharp.Control


[<EntryPoint>]
let main argv = 

  let rec nums () =
    asyncSeq {
    yield Array.zeroCreate 100000
    yield! nums ()
    }

  let numbers = nums ()

  let buffered = numbers |> AsyncSeq.bufferByCountAndTime 1000 (int 10) |> AsyncSeq.toObservable

  buffered.Subscribe(fun _ -> System.GC.Collect()) |> ignore

  System.Console.ReadKey() |> ignore

  printfn "%A" argv
  0 // return an integer exit code

Proposing AsyncSeq.ofSeqAsync and AsyncSeq.concat

Description

Would the maintainers consider including the following two helpers:

  • We already have AsyncSeq.init and AsyncSeq.unfoldAsync to generate new
    AsyncSeq, but I could not find any helper to create one from an
    existing sequence of asynchronous computations (seq<Async<_>>).
    The following helper ofSeqAsync would help with that:
module AsyncSeq =
    let ofSeqAsync (s:seq<Async<'t>>) : AsyncSeq<'t> =
        asyncSeq {
            for asyncElement in s do
                let! v =  asyncElement
                yield v
        }
  • Using AsyncSeq.concatSeq, it's currently possible to concatenate sequences of AsyncSeq (as in seq<AsyncSeq<_>>) but not an AsyncSeq of AsyncSeqs.
    I propose adding the following function.
module AsyncSeq =
    let concat (s:AsyncSeq<AsyncSeq<'t>>) : AsyncSeq<'t> =
        asyncSeq {
            for innerSeq in s do
                for e in innerSeq do
                    yield e
        }

Example use case

Suppose you are given:

  • enumerateFiles : string -> Async<AsyncSeq<string>> that asynchronously returns
    an asynchronous enumeration of all files under a given directory;
  • and enumerateDirectories : string -> AsyncSeq<string> that
    asynchronously enumerates all sub-directories under a given directory;

and you want to enumerate all files under all directories under c:\
then with the two helpers you could write it as follows:

enumerateDirectories "c:\"       // : Async<AsyncSeq<string>>
|> Seq.map enumerateFiles       // : seq<Async<AsyncSeq<string>>>
|> AsyncSeq.ofSeqAsync          // : AsyncSeq<AsyncSeq<string>>
|> AsyncSeq.concat              // : AsyncSeq<string> 

I undertand that the function AsyncSeq.mergeAll could be used to achieve the same goal, except that it would require first enumerating all directories and convert them into a list, before doing the concatenation.

Or perhaps there is a better way to do this?

Integrate with IAsyncEnumerable<'t>

Description

This library should offer integration points with IAsyncEnumerable, to paper over the task->async translations in one place.

That might look like a binding API like AsyncSeq.ofAsyncEnumerable, AsyncSeq.toAsyncEnumerable, extra computation builder overloads for the type, etc.

AsyncSeq.append called enormous number of times

The following code is running for about 4 seconds, which is extremely slow:

let linesRead = ref 0

let lines (file: string) =
    asyncSeq {
        use reader = new StreamReader(file)
        while not reader.EndOfStream do
            let! line = reader.ReadLineAsync() |> Async.AwaitTask
            incr linesRead
            yield line
    }

let sw = Stopwatch.StartNew()
let res =
    lines @"e:\docs\big.txt"
    |> AsyncSeq.take 3000
    |> AsyncSeq.map (fun line -> line.Split([|' '|], StringSplitOptions.RemoveEmptyEntries).Length)
    |> AsyncSeq.sum
    |> Async.RunSynchronously
sw.Stop()
printfn "result = %d, %O, number of actully read line: %d" res sw.Elapsed !linesRead
Console.ReadKey() |> ignore
0 

The output is result = 10273, 00:00:04.2336427, number of actully read line: 3000

I profiled it and AsyncSeq.append was called about 13 million times (!):

image

Missing XmlTransform library on Linux/Mono

using the build.sh script on Linux/Mono you will most likely see this:

generate.fsx(27,1): error FS0078: Unable to find the file 'Microsoft.Web.XmlTransform.dll'

I think this is the same issue we had here: fsprojects/FAKE#587 (or so I think).

Any thoughts on this?

Add YieldFrom overload for seq<'a>?

I'd like to turn Async<seq> to AsyncSeq

While this works

asyncSeq {
    for item in items -> item
}

yield! version does not:

asyncSeq {
    yield! items
}

I need this to get all pages from the endpoint in one go. Reading a single page yields Async<seq<Result>> , I'd like the result to be presented as AsyncSeq

Project needs a new logo

This project currently uses the unadorned FSSF Logo for F#. It really needs a logo of its own more in line with other projects.

Suggestions?

Presence of return is misleading

Given the following expression asyncSeq { return [1] }.
I would expect the type of it to be AsyncSeq<int>, unfortunately it is AsyncSeq<'a>.

I understand that in first place it's my wrong expectation about the result's presence, actually as it's a sequential workflow, it shouldn't be there at all.
By looking at the source code I know that the presence of return is caused by the wish to support the do! action functionality, as it's translated by the F# compiler into Bind(action, fun () -> Return()).
Could it be a F# compiler problem? What if the compiler would transform it to Bind(action, Zero()), then the resultreturn could be dropped right?

05.12.2015: corrected identifier

Question on caching

Hello,

Thank you for such a great library. I have a question about sequence caching. I want to have one sequence generator function, that then mapped into two different data streams (independent), and then zipped at some point. I have the following sample code:

`let oneTwo = asyncSeq {
do! Async.Sleep(1000)
yield 1
do! Async.Sleep(1000)
yield 2 }

let cached = oneTwo |> AsyncSeq.cache

let first = cached |> AsyncSeq.map (fun x -> -x)
let second = cached |> AsyncSeq.map (fun x -> x * 4)

let zipped = (first, second) ||> AsyncSeq.zip

zipped 
|> AsyncSeq.iter(fun (f,s) -> printf "--> First %A Second %A \r\n" f s) 
|> Async.Start`

The output is the following:

--> First -1 Second 4
--> First -2 Second 4

Seems like second sequence is not mapped correctly. Can you please clarify what would be the idiomatic way of doing this?

Thank you,
Pavel

Combination of groupBy and mapAsync involving access to the group causes deadlock.

Description

My code deadlocks when working with AsyncSeq pipeline.

Repro steps

Code example:

[1;2;3] 
|> AsyncSeq.ofSeq 
|> AsyncSeq.groupBy (fun x -> x) 
|> AsyncSeq.mapAsync (fun (key, gr) -> async {
    let! length = AsyncSeq.length gr
    return (key, length)
})
|> AsyncSeq.toBlockingSeq

Expected behavior

No deadlock, or some xml-doc remarks with the warnings of such possibility, if this is by design.

Known workarounds

It looks like I have to eagerly enumerate the sequence, so all groups get resolved

Related information

  • Operating system Win 10
  • .NET Runtime: 5.0.100

BufferByCountAndTime impact to Async.StartWithContinuations

Description

Async.StartWithContinuations call is blocked when using AsyncSeq.BufferByCountAndTime in async operation.

Repro steps

let op =
asyncSeq{
while true do
do! Async.Sleep 1000
yield 0
}
|> AsyncSeq.bufferByCountAndTime 10 1000
|> AsyncSeq.iter (printf "%A ")

let cts = new System.Threading.CancellationTokenSource()
Async.StartWithContinuations(op, ignore, ignore, ignore, cts.Token)

Expected behavior

Async.StartWithContinuations call is returned

Actual behavior

Async.StartWithContinuations call blocks thread.

Known workarounds

Related information

  • Windows 10
  • FSharp.Control.AsyncSeq (2.0.11)

Lack of parallelism in AsyncSeq.cache

Description

I noticed that when multiple consumers use a cached AsyncSeq, the late ones delay reading cached items until the lead consumer receives a new item.

Repro steps

See the change to AsyncSeqTests.fs at
https://github.com/fsprojects/FSharp.Control.AsyncSeq/pull/94/files

Expected behavior

Second consumer should take close to no time to read all 5 cached items

Actual behavior

Second consumer can't proceed and read cached items until the first consumer has received the next, most recent item.

Known workarounds

None

Related information

Problem observed on .net 4.5 and .net core 2.0 on master. Also observed with NuGet package 2.0.21 with .net 4.5

See #94 for a suggested fix and test case.

Memory leak when unfolding

We're currently trying to use AsyncSeq with an unfold that does repeated HTTP requests that get paged data; each unfold step produces a page of data until we run out of pages, in which case the sequence completes. It looks like AsyncSeq has a memory leak, because we expected that we'd be able to async-enumerate over this sequence consuming and discarding each page of data without constantly increasing memory usage.

Instead, what we've discovered is that AsyncSeq seems to hold onto each page of data in a massive chain of append function closures that grows as we enumerate over the async seq. Over time the application's memory usage grows until we finally get an out of memory exception and crash.

Here's a screenshot of a dotMemory profiling session on some similar sample code (below) where I've tracked down the offending chain of objects that's holding everything in memory at once. I've put red boxes around each of the nested append closures; the data they hold is of type TrackingContainer. You can see each of the TrackingContainer items is a different instance, because I've added a different flag next to each one. This chain continues on further offscreen, further than I was going to follow through the tree view :)

asyncseqmemoryleak

The sample code that reproduces this issue (and was used for the above profiling session) is:

open FSharp.Control

type TrackingContainer = TrackingContainer of Data : string list

let generator state =
    async {
        if state < 10000 then
            //In reality this is where we do an HTTP request to get a chunk of data
            //but let's just generate some data inline and wrap it 
            //in a special class we can use to easily identify the chunks of data
            //in a profiler
            return Some (TrackingContainer (Seq.init 100 (fun i -> i.ToString()) |> Seq.toList), state + 1)
        else
            return None
    }

let processAsyncSeq (aSeq : _ AsyncSeq) =
    async {
        for TrackingContainer data in aSeq do
            for d in data do
                sprintf "%s" d |> ignore //Busywork with the data
    }

[<EntryPoint>]
let main argv = 
    AsyncSeq.unfoldAsync generator 0
    |> processAsyncSeq
    |> Async.RunSynchronously

    0 // return an integer exit code

Given the profiling session I did, I think the issue is with append, but I'm struggling to fully grok what's going on in the AsyncSeq + Async internals enough to be able to pinpoint exactly why this is happening. :(

Fable support?

As it stands now, it this library does not seem to support Fable. According to the Fable docs, it is quite simple to add by modifying the project file:

<!-- Add source files to "fable" folder in Nuget package -->
<ItemGroup>
    <Content Include="*.fsproj; **\*.fs" PackagePath="fable\" />
</ItemGroup>

Is Fable support something that would be accepted in a PR?

Add ofIQueryable method

Actual behavior

Currently to produce AsynSeq from IQueryable I have to write

(query {
    for biblioteko in context.Bibliotekoj do
        where (biblioteko.ID = bibliotekoId)
        select biblioteko
 }).AsAsyncEnumerable()
|> AsyncSeq.ofAsyncEnum
|> AsyncSeq.tryFirst

Expected behavior

But I could write

(query {
    for biblioteko in context.Bibliotekoj do
        where (biblioteko.ID = bibliotekoId)
        select biblioteko
 })
|> AsyncSeq.ofIQueryable
|> AsyncSeq.tryFirst

Related information

  • FSharp.Control.AsyncSeq 2.0.23

Unexpected iteration performance drop when recursive loops are used.

Description

The problem appears when a recursive asyncSeq loop yields a number of elements and at least one async bind is inside the loop.

Repro steps

The performance drop can reproduced by the following test case:

open System.Diagnostics
open FSharp.Control

let rec generate cnt = asyncSeq {
    if cnt = 0 then () else
    let! v = async.Return 1
    yield v
    yield! generate (cnt-1)
}

[1000;2000;4000]
|> List.iter (fun numbers ->
    let sw = Stopwatch.StartNew()
    generate numbers
    |> AsyncSeq.iter ignore
    |> Async.RunSynchronously
    printfn "%d: %A" numbers sw.Elapsed)

The function generate above yields cnt numberd (all '1's), and it produces the following output on my machine:

1000: 00:00:01.1934439
2000: 00:00:04.7116588
4000: 00:00:18.9604031

Expected behavior

The time it takes to iterate the asynchronous sequence should be proportional to the number of elements it generates.

Actual behavior

The performance seems to be about O(n^2) instead of O(n).

Known workarounds

Try to avoid recursion in all asyncSeq scenarios that may return more than a few hundred values.

Related information

I've tested Release builds of master and @eulerfx's perf branch with no noticable difference. A number of functions in AsyncSeq.fs use a similar looping pattern and might be affected, too.

Best way to convert System.Collections.Generic.IAsyncEnumerable<'t> to AsyncSeq<'t>?

The Azure EventHub API returns a IAsyncEnumberable<'t> (from BCL).

I would like to convert it to asyncSeq (or FSharp.Control.IAsyncEnumerable<'t>) (for obvious reasons).

The following code type checks but there might be a better way that I am missing. Any suggestions would be appreciated.

    let toAsyncSeq<'t> (xs:System.Collections.Generic.IAsyncEnumerable<'t>) =
        let aeEnum = xs.GetAsyncEnumerator()
        let rec loop() = 
            asyncSeq {
                let! haveData = aeEnum.MoveNextAsync().AsTask() |> Async.AwaitTask
                if haveData then 
                    yield aeEnum.Current
                    yield! loop()
            }
        loop()

Update to use github actions

  1. I've removed travis support

  2. I've enabled github actions

We still have an AppVeyor action that is useful as it builds and tests for fable. We should move this over to GitHub actions I think

environment:
  CLI_VERSION: 3.1.301
image: Visual Studio 2019
init:
  - git config --global core.autocrlf input
install:
  - ps: Install-Product node 14
  - yarn --cwd tests/fable --cache-folder=.cache/yarn --no-progress --pure-lockfile
cache:
  - tests/fable/.cache/yarn
build_script:
  - cmd: build.cmd
test: off
test_script:
  - yarn --cwd tests/fable test
version: 0.0.1.{build}
artifacts:
  - path: bin
    name: bin

choose does not work for async seq generated using AsyncSeq.unfold

Description

When async seq is generated using AsyncSeq.unfold method choose only returns elements until first exclusion (None).

Repro steps

let chooser x = if x % 3 = 0 then None else Some x
let inputLst = [ 1 .. 9 ]
let expected = inputLst |> Seq.choose chooser |> Seq.toList
// expected is [1; 2; 4; 5; 7; 8]
let inputASeq = AsyncSeq.unfold (fun i -> if i <= 9 then Some (i,i+1) else None) 1
let actual = inputASeq |> AsyncSeq.choose chooser |> AsyncSeq.toList
// actual is [1; 2]

Expected behavior

Both should return [1; 2; 4; 5; 7; 8]

Actual behavior

AsyncSeq.choose returns [1; 2]

Known workarounds

Use map & filter for sequences generated using AsyncSeq.unfold

Related information

  • Operating system
    Windows 7
  • Branch
    master
  • Database versions and sample databases being used
  • .NET Runtime, CoreCLR or Mono Version
    VS2015 F# interactive

iterAsyncParallel and iterAsyncParallelThrottled may fail to cancel

Description

iterAsyncParallel and iterAsyncParallelThrottled may fail to cancel; this can be observed when exceptions are thrown from another Async in Async.Parallel.

Repro steps

Execute the following code snippet

let r = Random()
 
let handle x = async {
    do! Async.Sleep (r.Next(200))
    printfn "%A" x
}

let fakeAsync = async {    
    do! Async.Sleep 500
    return "hello"
}

let makeAsyncSeqBatch () =
    let rec loop() = asyncSeq {            
        let! batch =  fakeAsync |> Async.Catch
        match batch with
        | Choice1Of2 batch ->
          if (Seq.isEmpty batch) then
            do! Async.Sleep 500
            yield! loop()
          else
            yield batch
            yield! loop() 
        | Choice2Of2 err ->
             printfn "Problem getting batch: %A" err
    }
    
    loop()

let x = makeAsyncSeqBatch () |> AsyncSeq.concatSeq |> AsyncSeq.iterAsyncParallel handle
let exAsync = async {
    do! Async.Sleep 2000
    failwith "error"
}

[x; exAsync] |> Async.Parallel |> Async.Ignore |> Async.RunSynchronously

When exAsync throws, Async.Parallel will attempt to cancel the iteration of the AsyncSeq before it returns. iterAsyncParallel and iterAsyncParalleThrottled will sometimes not return and continue running forever. Changing to iterAsync always stops the iteration of the AsyncSeq reliably.

Known workarounds

Don't use iterAsyncParallel or iterAsyncParalleThrottled, use iterAsync instead.

Related information

  • Operating system
    Windows
  • Branch
    2.0.21
  • .NET Runtime, CoreCLR or Mono Version
    .NET Core 3.1

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.