Comments (19)
I'll answer here in a number of parts.
The first question was whether Alt
could be compared to lazy
. Alt<'x>
would be slightly more accurately compared to a thunk aka a function of type unit -> 'x
. The reason for this is that lazy
implies performing any effect only once and then memoizing the result, while, in general, alternatives make no such guarantee. The effects of an alternative, such as offering to give a message on a channel, are performed each time the alternative is "instantiated".
from hopac.
Regarding AutoResetEvent
, here is what I think would be a CML-style design using Hopac.
type AutoResetEvent (initialState: bool) =
let waitCh = ch ()
let setCh = ch ()
let rec set () = (waitCh <-? () >>=? unset) <|> (setCh >>=? set)
and unset () = setCh >>= set
do start (if initialState then set () else unset ())
member this.Set: Job<unit> = setCh <-- ()
member this.Wait: Alt<unit> = upcast waitCh
Why would this be CML-style or idiomatic?
We keep the state, in this case whether the event is set or not, in a lightweight thread that implements the logic of the concurrent abstraction. This way the state is nicely protected from most concurrency issues.
We use channels to communicate with the lightweight thread, requesting state changes or responses according to state.
We don't implement timeouts, but rather we provide a simple alternative for waiting for the event. It is now possible for any client of the are: AutoResetEvent
to wait for the event without:
are.Wait >>= // ...
or with timeout:
Alt.select [
are.Wait >>=? // ...
timeout >>=? // ...
]
or as alternative to some completely different alternative:
Alt.select [
are.Wait >>=? // ...
sendMissiles >>=? // ...
]
But, of course, there are many ways to implement this functionality.
from hopac.
Here is a ManualResetEvent
implementation in similar style:
type ManualResetEvent (initialState: bool) =
let waitCh = ch ()
let controlCh = ch ()
do start << Job.iterate initialState <| function
| false -> upcast controlCh
| true -> (waitCh <-? () >>%? true) <|> asAlt controlCh
member this.Set: Job<unit> = controlCh <-- true
member this.Reset: Job<unit> = controlCh <-- false
member this.Wait: Alt<unit> = upcast waitCh
This implementation similarly maintains state in a lightweight server thread. This time the server thread loop is implemented using the Job.iterate
combinator. Rather than a plain setCh: Ch<unit>
the state change requests are now given on a controlCh: Ch<bool>
.
There is one "tricky" detail in the above implementation. In the case that the state is true
, the server thread first tries the alternative of giving a message on the waitCh
. This means that the server will satisfy all wait requests before accepting another control message.
Again, there are many possible ways to implement the same functionality.
from hopac.
Back to the AutoResetEvent
. Here is a simplified version of the code you gave:
type HopacAutoResetEvent (initialState : bool) =
let setChannel : Ch<unit> = ch()
do if initialState then start <| Ch.send setChannel ()
member this.Wait: Alt<unit> = Ch.Alt.take setChannel
member this.Set: Job<unit> = Ch.Try.take setChannel >>. Ch.send setChannel ()
There is a problem in the above AutoResetEvent
implementation. The problem is that there is a race condition. If multiple threads execute the Set
operation concurrently, it is possible that more than one thread effectively simultaneously executes the Ch.Try.take
operation before advancing to the Ch.send
operation. This means that more than one messages will be left in the channel and subsequently multiple waits will be satisfied.
I think that the timeout logic you presented was basically correct, so I just left it out for simplicity.
BTW, looking at the timeout logic, I realized that there is a kind of bug in the current timeout implementation of Hopac, because it doesn't support the special case of infinite timeouts. I'll fix that ASAP.
from hopac.
Thinking about the race condition, I think it is best to consider channels as stateless. Threads, on the other hand, can hold state and can then choose what to do with that state in response to communication with other threads.
BTW, thanks for the question! I hope my answers have helped. Feel free to ask further questions. I'll likely comment further on the HopacManualResetEvent
implementation a bit later.
from hopac.
Thank you very much! This helps a lot. I need to digest your answers and play with the examples to understand the idioms and subtleties.
How upcast
for channels works? You return a channel as Alt
- is it the same as Ch.Alt.take
?. What happens with this Alt
after give
if several threads are waiting?
from hopac.
Yes, upcast ch
is the same as Ch.Alt.take ch
. This is an optimization to reduce memory allocations. The inheritance hierarchy is Ch<'x> :> Alt<'x> :> Job<'x>
.
If multiple threads wait for a Ch.Alt.take ch
(or the exact same upcast ch
) alternative and some other thread performs Ch.give ch x
then one of the threads waiting on the channel is given (or takes) the message. The other threads continue waiting on the channel.
from hopac.
So in ManualResetEvent
case how many thread will go through after Set
? Am I correct that if several threads are waiting, then:
- Before
Set
, iterator is waiting on the control channel here (after false case):
do start << Job.iterate initialState <| function
| false -> upcast controlCh //wait here before set
| true -> (waitCh <-? () >>%? true) <|> asAlt controlCh
- After
Set
,controlCh
returns and then loops to thetrue
case. Th
do start << Job.iterate initialState <| function
| false -> upcast controlCh // return from here
| true -> (waitCh <-? () >>%? true) <|> asAlt controlCh // iterate to here
- If there is a waiter, first
Alt
(waitCh <-? () >>%? true)
releases the waiter, and iterator goes back to the same place. If there are many waiters, we will iterate until we signal each them. When there are no more waiters, we block onasAlt controlCh
in the truecase
.
What will happen if there are 10 threads waiting, then we iterate to release all of them, and right after releasing the 5th thread another thread calls Reset
?
Here (waitCh <-? () >>%? true) <|> asAlt controlCh
we will signal remaining 5 threads due to the order of Alts
. But what if after releasing the 6th waiter other threads will call Wait
before we release the 10th one? It looks like the new threads (11th, 12th,...) will also be released even though they called Wait
after Reset
?
Reversing the order asAlt controlCh <|> (waitCh <-? () >>%? true)
won't help, since we will block threads 7-10. We need a lock while iterating on many waiter after set.
from hopac.
Your analysis is almost correct. Note that the Reset
operation is synchronous. It will not be completed until the server thread responds to it. Let's say that some thread starts executing the Reset
operation after the 5th thread is released. What happens is that the Reset
operation blocks until all the threads waiting have been released.
This corresponds to the idea that Set
is effectively an atomic operation: all threads waiting for the event will be released.
If the true
case were written:
| true -> asAlt controlCh <|> (waitCh <-? () >>%? true)
Then an outside thread could Reset
the event before all waiters have been released.
from hopac.
Here is an implementation of AutoResetEvent
that doesn't use a server thread:
type AutoResetEvent (initialState: bool) =
let set = if initialState then MVar.Now.createFull () else mvar ()
let unset = if initialState then mvar () else MVar.Now.createFull ()
member this.Set: Job<unit> =
(set <|> unset) >>= fun () -> set <<-= ()
member this.Wait: Alt<unit> =
set >>=? fun () -> unset <<-= ()
This implementation makes use of two MVar
s of which at most one holds a value at any time. This illustrates the idea of using MVar
s for passing a "permission token".
from hopac.
Just wow! It is so simple and powerful, and almost easy! (to both comments)
from hopac.
But wait,
Your analysis is almost correct. Note that the Reset operation is synchronous. It will not be completed until the server thread responds to it. Let's say that some thread starts executing the Reset operation after the 5th thread is released. What happens is that the Reset operation blocks until all the threads waiting have been released.
what if we have the same 10 waiters, then we call Reset
after releasing the 5th one. Reset
will block. But then after releasing 6th waiter 11th, 12th... waiter come before we release 10th and before we go to <|> asAlt controlCh
. We won't block 7th-10th, but 11+th ones will be released. If waiters are added at the same speed as they are released then many threads will go through. Am I missing something?
from hopac.
I was just in the middle of writing this reply... Yes, indeed. Thinking about this further, I think that my initial version of ManualResetEvent
can be considered unfair, because it basically gives priority to releasing Wait
operations. I think that could be used to contrive a program that live locks.
Here is an implementation on ManualResetEvent
using the MVar
permission token passing idiom:
type ManualResetEvent (initialState: bool) =
let set = if initialState then MVar.Now.createFull () else mvar ()
let unset = if initialState then mvar () else MVar.Now.createFull ()
member this.Set = (set <|> unset) >>= MVar.fill set
member this.Reset = (set <|> unset) >>= MVar.fill unset
member this.Wait = MVar.Alt.read set
In this version none of the operations is given priority over other operations. The set
variable effectively acts as a queue for the operations.
from hopac.
Here is a fair version of ManualResetEvent
using the server loop idiom:
type Msg =
| Wait of IVar<Alt<unit>>
| Set
| Reset
type ManualResetEvent (initialState: bool) =
let reqCh = ch ()
let rec isSet (v: Alt<unit>) =
reqCh >>= function
| Wait r -> r <-= v >>. isSet v
| Set -> isSet v
| Reset -> isUnset (ivar ())
and isUnset (v: IVar<unit>) =
reqCh >>= function
| Wait r -> r <-= upcast v >>. isUnset v
| Set -> v <-= () >>. isSet v
| Reset -> isUnset v
do start <| if initialState
then isSet (IVar.Now.createFull ())
else isUnset (ivar ())
member this.Set = reqCh <-- Set
member this.Reset = reqCh <-- Reset
member this.Wait =
Alt.guard << Job.delay <| fun () ->
let reply = ivar ()
reqCh <-+ Wait reply >>.
reply
When isSet
, the state of the server is an ivar that has been filled. Transitioning to isUnset
, a new empty ivar is created. Transitioning back to isSet
, the empty ivar is filled. And so on. Wait
operation requests the state from the server. This illustrates the use of Alt.guard
to encapsulate an operation that sends a request to a server and then waits for the result.
from hopac.
Here is one more fair version of ManualResetEvent
:
type ManualResetEvent (initialState: bool) =
let state = mvarFull <| if initialState then ivarFull () else ivar ()
member this.Set =
state >>= fun v -> IVar.tryFill v () >>. (state <<-= v)
member this.Reset =
state >>= fun v ->
state <<-= if IVar.Now.isFull v then ivar () else v
member this.Wait = Alt.guard (MVar.read state |>> asAlt)
This version uses a mvar to serialize access to the state which is an ivar.
from hopac.
Thank you again! This is very helpful to feel Hopac.
Another question - how to run a synchronous blocking operations inside Hopac jobs, e.g. native non-async ARE/MREs that just block a thread? Should I always wrap them in Async/Task and then use Async.toJob
or Task.awaitJob
as in an example below? Otherwise I will block a worker thread of Hopac and effectively one core?
let jumpOutOfWorkerThread (blockingCall:'x->'y) (x:'x) : Job<'y> =
let asAsync : Async<'y> = async { return blockingCall(x)}
Hopac.Extensions.Async.toJob asAsync
Or using Job.lift
is safe in this respect?
from hopac.
BTW, one thing I didn't emphasize earlier is that Job<'x>
and Alt<'x>
values (and Async<'x>
values) are reusable. If you care for performance (space and time), then it can sometimes make a big difference (either way) whether you reallocate or reuse (when possible) operations. So, for example, this would also be a valid implementation of AutoResetEvent
:
type AutoResetEvent =
val Set: Job<unit>
val Wait: Alt<unit>
new (initialState: bool) =
let set = if initialState then mvarFull () else mvar ()
let unset = if initialState then mvar () else mvarFull ()
{Set = (set <|> unset) >>= MVar.fill set
Wait = set >>=? MVar.fill unset}
new () = AutoResetEvent (false)
from hopac.
One should not block Hopac worker threads or equivalently one should also not use a Hopac job to run a thread that never waits. Such operations will interfere with the scheduling mechanisms of Hopac.
If you just need to run some simple CPU bound synchronous computation that takes a relatively long time, but will ultimately finish, then you may be just fine synchronously calling that computation from within a Hopac job. You may be fine even if that computation does some IO.
For more involved synchronous operations that take long enough time that they interfere with responsiveness or spend a lot of time waiting for IO you will need to do something more clever. Job.lift
is not safe in this respect.
Note that such involved, non-cooperative, synchronous operations don't really work nicely from within async
s or Task
s either. Usually async
and Task
use ThreadPool
underneath and what happens is that more worker threads will be created which is very expensive. Ultimately it would be better to rewrite CPU bound operations to take advantage of parallelism and IO bound operations to take advantage of non-blocking asynchronous IO.
If you really need to interface with such non-cooperative legacy APIs, then you could just as well use the legacy helper for that: ThreadPool
.
Here is how one could wrap a WaitHandle
as a job:
type WaitHandle with
member this.awaitJob (timeout: TimeSpan) : Job<bool> =
Job.scheduler () >>= fun sr ->
let rV = ivar ()
ThreadPool.RegisterWaitForSingleObject
(this, (fun _ r -> Scheduler.start sr (rV <-= r)), null, timeout, true)
upcast rV
Here is how one could wrap an expensive, non-cooperative, synchronous computation as a job:
type ThreadPool with
static member runAsJob (op: unit -> 'x) : Job<'x> =
Job.scheduler () >>= fun sr ->
let rV = ivar ()
ThreadPool.QueueUserWorkItem
(fun _ -> Scheduler.start sr (try rV <-= op () with e -> IVar.fillFailure rV e))
upcast rV
BTW, I'm currently in the process of rewriting/redesigning the scheduler mechanism of Hopac to allow for SynchronizationContext
support. Once finished, the pattern you see above will change slightly.
from hopac.
Holy hell this thread was useful.
from hopac.
Related Issues (20)
- Add Azure DevOps build HOT 2
- Problems with network stream HOT 9
- Handling exceptions HOT 9
- Stack Overflow in Alt.choose HOT 3
- Fibonacci benchmark fails with Stack Overflow on NET Core, but not on Framework HOT 4
- Production Ready? HOT 5
- IAsyncEnumerable<T> interop HOT 1
- How to convert to task? HOT 6
- ValueTask<T> interop HOT 1
- How to get the build working on Windows? HOT 2
- Stack overflow with recursive job HOT 5
- Is this project dead? HOT 1
- Why do these two code snippets result in different behavior HOT 1
- FSharp.Core 5.0 support HOT 15
- Why does the left side not uncommit after the right side has been chosen when using Alt.prepare? HOT 8
- How can uncaught exception errors be redirected?
- BadImageFormatException HOT 6
- How to conCollect but stop on first exception? HOT 8
- System.Diagnostics.Activity doesn't flow properly with Hopac, resulting in incorrect telemetry correlation
- Example of paranoid function?
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from hopac.