Code Monkey home page Code Monkey logo

Comments (19)

polytypic avatar polytypic commented on June 14, 2024

I'll answer here in a number of parts.

The first question was whether Alt could be compared to lazy. Alt<'x> would be slightly more accurately compared to a thunk aka a function of type unit -> 'x. The reason for this is that lazy implies performing any effect only once and then memoizing the result, while, in general, alternatives make no such guarantee. The effects of an alternative, such as offering to give a message on a channel, are performed each time the alternative is "instantiated".

from hopac.

polytypic avatar polytypic commented on June 14, 2024

Regarding AutoResetEvent, here is what I think would be a CML-style design using Hopac.

type AutoResetEvent (initialState: bool) =
  let waitCh = ch ()
  let setCh = ch ()

  let rec set () = (waitCh <-? () >>=? unset) <|> (setCh >>=? set)
  and unset () = setCh >>= set
  do start (if initialState then set () else unset ())

  member this.Set: Job<unit> = setCh <-- ()

  member this.Wait: Alt<unit> = upcast waitCh

Why would this be CML-style or idiomatic?

We keep the state, in this case whether the event is set or not, in a lightweight thread that implements the logic of the concurrent abstraction. This way the state is nicely protected from most concurrency issues.

We use channels to communicate with the lightweight thread, requesting state changes or responses according to state.

We don't implement timeouts, but rather we provide a simple alternative for waiting for the event. It is now possible for any client of the are: AutoResetEvent to wait for the event without:

are.Wait >>= // ...

or with timeout:

Alt.select [
  are.Wait >>=? // ...
  timeout >>=? // ...
] 

or as alternative to some completely different alternative:

Alt.select [
  are.Wait >>=? // ...
  sendMissiles >>=? // ...
]

But, of course, there are many ways to implement this functionality.

from hopac.

polytypic avatar polytypic commented on June 14, 2024

Here is a ManualResetEvent implementation in similar style:

type ManualResetEvent (initialState: bool) =
  let waitCh = ch ()
  let controlCh = ch ()

  do start << Job.iterate initialState <| function
      | false -> upcast controlCh
      | true  -> (waitCh <-? () >>%? true) <|> asAlt controlCh

  member this.Set: Job<unit> = controlCh <-- true
  member this.Reset: Job<unit> = controlCh <-- false

  member this.Wait: Alt<unit> = upcast waitCh

This implementation similarly maintains state in a lightweight server thread. This time the server thread loop is implemented using the Job.iterate combinator. Rather than a plain setCh: Ch<unit> the state change requests are now given on a controlCh: Ch<bool>.

There is one "tricky" detail in the above implementation. In the case that the state is true, the server thread first tries the alternative of giving a message on the waitCh. This means that the server will satisfy all wait requests before accepting another control message.

Again, there are many possible ways to implement the same functionality.

from hopac.

polytypic avatar polytypic commented on June 14, 2024

Back to the AutoResetEvent. Here is a simplified version of the code you gave:

type HopacAutoResetEvent (initialState : bool) =
  let setChannel : Ch<unit> = ch()
  do if initialState then start <| Ch.send setChannel ()
  member this.Wait: Alt<unit> = Ch.Alt.take setChannel
  member this.Set: Job<unit> = Ch.Try.take setChannel >>. Ch.send setChannel ()

There is a problem in the above AutoResetEvent implementation. The problem is that there is a race condition. If multiple threads execute the Set operation concurrently, it is possible that more than one thread effectively simultaneously executes the Ch.Try.take operation before advancing to the Ch.sendoperation. This means that more than one messages will be left in the channel and subsequently multiple waits will be satisfied.

I think that the timeout logic you presented was basically correct, so I just left it out for simplicity.

BTW, looking at the timeout logic, I realized that there is a kind of bug in the current timeout implementation of Hopac, because it doesn't support the special case of infinite timeouts. I'll fix that ASAP.

from hopac.

polytypic avatar polytypic commented on June 14, 2024

Thinking about the race condition, I think it is best to consider channels as stateless. Threads, on the other hand, can hold state and can then choose what to do with that state in response to communication with other threads.

BTW, thanks for the question! I hope my answers have helped. Feel free to ask further questions. I'll likely comment further on the HopacManualResetEvent implementation a bit later.

from hopac.

buybackoff avatar buybackoff commented on June 14, 2024

Thank you very much! This helps a lot. I need to digest your answers and play with the examples to understand the idioms and subtleties.

How upcast for channels works? You return a channel as Alt - is it the same as Ch.Alt.take?. What happens with this Alt after give if several threads are waiting?

from hopac.

polytypic avatar polytypic commented on June 14, 2024

Yes, upcast ch is the same as Ch.Alt.take ch. This is an optimization to reduce memory allocations. The inheritance hierarchy is Ch<'x> :> Alt<'x> :> Job<'x>.

If multiple threads wait for a Ch.Alt.take ch (or the exact same upcast ch) alternative and some other thread performs Ch.give ch x then one of the threads waiting on the channel is given (or takes) the message. The other threads continue waiting on the channel.

from hopac.

buybackoff avatar buybackoff commented on June 14, 2024

So in ManualResetEvent case how many thread will go through after Set? Am I correct that if several threads are waiting, then:

  • Before Set, iterator is waiting on the control channel here (after false case):
  do start << Job.iterate initialState <| function
      | false -> upcast controlCh                                                       //wait here before set
      | true  -> (waitCh <-? () >>%? true) <|> asAlt controlCh
  • After Set, controlCh returns and then loops to the true case. Th
  do start << Job.iterate initialState <| function
      | false -> upcast controlCh                                                       // return from here
      | true  -> (waitCh <-? () >>%? true) <|> asAlt controlCh          // iterate to here
  • If there is a waiter, first Alt (waitCh <-? () >>%? true) releases the waiter, and iterator goes back to the same place. If there are many waiters, we will iterate until we signal each them. When there are no more waiters, we block on asAlt controlCh in the true case.

What will happen if there are 10 threads waiting, then we iterate to release all of them, and right after releasing the 5th thread another thread calls Reset?

Here (waitCh <-? () >>%? true) <|> asAlt controlCh we will signal remaining 5 threads due to the order of Alts. But what if after releasing the 6th waiter other threads will call Wait before we release the 10th one? It looks like the new threads (11th, 12th,...) will also be released even though they called Wait after Reset?

Reversing the order asAlt controlCh <|> (waitCh <-? () >>%? true) won't help, since we will block threads 7-10. We need a lock while iterating on many waiter after set.

from hopac.

polytypic avatar polytypic commented on June 14, 2024

Your analysis is almost correct. Note that the Reset operation is synchronous. It will not be completed until the server thread responds to it. Let's say that some thread starts executing the Reset operation after the 5th thread is released. What happens is that the Reset operation blocks until all the threads waiting have been released.

This corresponds to the idea that Set is effectively an atomic operation: all threads waiting for the event will be released.

If the true case were written:

      | true  -> asAlt controlCh <|> (waitCh <-? () >>%? true)

Then an outside thread could Reset the event before all waiters have been released.

from hopac.

polytypic avatar polytypic commented on June 14, 2024

Here is an implementation of AutoResetEvent that doesn't use a server thread:

type AutoResetEvent (initialState: bool) =
  let set = if initialState then MVar.Now.createFull () else mvar ()
  let unset = if initialState then mvar () else MVar.Now.createFull ()

  member this.Set: Job<unit> =
    (set <|> unset) >>= fun () -> set <<-= ()

  member this.Wait: Alt<unit> =
    set >>=? fun () -> unset <<-= ()

This implementation makes use of two MVars of which at most one holds a value at any time. This illustrates the idea of using MVars for passing a "permission token".

from hopac.

buybackoff avatar buybackoff commented on June 14, 2024

Just wow! It is so simple and powerful, and almost easy! (to both comments)

from hopac.

buybackoff avatar buybackoff commented on June 14, 2024

But wait,

Your analysis is almost correct. Note that the Reset operation is synchronous. It will not be completed until the server thread responds to it. Let's say that some thread starts executing the Reset operation after the 5th thread is released. What happens is that the Reset operation blocks until all the threads waiting have been released.

what if we have the same 10 waiters, then we call Reset after releasing the 5th one. Reset will block. But then after releasing 6th waiter 11th, 12th... waiter come before we release 10th and before we go to <|> asAlt controlCh. We won't block 7th-10th, but 11+th ones will be released. If waiters are added at the same speed as they are released then many threads will go through. Am I missing something?

from hopac.

polytypic avatar polytypic commented on June 14, 2024

I was just in the middle of writing this reply... Yes, indeed. Thinking about this further, I think that my initial version of ManualResetEvent can be considered unfair, because it basically gives priority to releasing Wait operations. I think that could be used to contrive a program that live locks.

Here is an implementation on ManualResetEvent using the MVar permission token passing idiom:

type ManualResetEvent (initialState: bool) =
  let set = if initialState then MVar.Now.createFull () else mvar ()
  let unset = if initialState then mvar () else MVar.Now.createFull ()
  member this.Set = (set <|> unset) >>= MVar.fill set
  member this.Reset = (set <|> unset) >>= MVar.fill unset
  member this.Wait = MVar.Alt.read set

In this version none of the operations is given priority over other operations. The set variable effectively acts as a queue for the operations.

from hopac.

polytypic avatar polytypic commented on June 14, 2024

Here is a fair version of ManualResetEvent using the server loop idiom:

type Msg =
  | Wait of IVar<Alt<unit>>
  | Set
  | Reset

type ManualResetEvent (initialState: bool) =
  let reqCh = ch ()

  let rec isSet (v: Alt<unit>) =
    reqCh >>= function
     | Wait r -> r <-= v >>. isSet v
     | Set -> isSet v
     | Reset -> isUnset (ivar ())
  and isUnset (v: IVar<unit>) =
    reqCh >>= function
     | Wait r -> r <-= upcast v >>. isUnset v
     | Set -> v <-= () >>. isSet v
     | Reset -> isUnset v

  do start <| if initialState
              then isSet (IVar.Now.createFull ())
              else isUnset (ivar ())

  member this.Set = reqCh <-- Set
  member this.Reset = reqCh <-- Reset

  member this.Wait =
    Alt.guard << Job.delay <| fun () ->
    let reply = ivar ()
    reqCh <-+ Wait reply >>.
    reply

When isSet, the state of the server is an ivar that has been filled. Transitioning to isUnset, a new empty ivar is created. Transitioning back to isSet, the empty ivar is filled. And so on. Wait operation requests the state from the server. This illustrates the use of Alt.guard to encapsulate an operation that sends a request to a server and then waits for the result.

from hopac.

polytypic avatar polytypic commented on June 14, 2024

Here is one more fair version of ManualResetEvent:

type ManualResetEvent (initialState: bool) =
  let state = mvarFull <| if initialState then ivarFull () else ivar ()
  member this.Set =
    state >>= fun v -> IVar.tryFill v () >>. (state <<-= v)
  member this.Reset =
    state >>= fun v ->
    state <<-= if IVar.Now.isFull v then ivar () else v
  member this.Wait = Alt.guard (MVar.read state |>> asAlt)

This version uses a mvar to serialize access to the state which is an ivar.

from hopac.

buybackoff avatar buybackoff commented on June 14, 2024

Thank you again! This is very helpful to feel Hopac.

Another question - how to run a synchronous blocking operations inside Hopac jobs, e.g. native non-async ARE/MREs that just block a thread? Should I always wrap them in Async/Task and then use Async.toJob or Task.awaitJob as in an example below? Otherwise I will block a worker thread of Hopac and effectively one core?

let jumpOutOfWorkerThread (blockingCall:'x->'y) (x:'x) : Job<'y> =
        let asAsync : Async<'y> = async { return blockingCall(x)}
        Hopac.Extensions.Async.toJob asAsync

Or using Job.lift is safe in this respect?

from hopac.

polytypic avatar polytypic commented on June 14, 2024

BTW, one thing I didn't emphasize earlier is that Job<'x> and Alt<'x> values (and Async<'x> values) are reusable. If you care for performance (space and time), then it can sometimes make a big difference (either way) whether you reallocate or reuse (when possible) operations. So, for example, this would also be a valid implementation of AutoResetEvent:

type AutoResetEvent =
  val Set: Job<unit>
  val Wait: Alt<unit>
  new (initialState: bool) =
    let set = if initialState then mvarFull () else mvar ()
    let unset = if initialState then mvar () else mvarFull ()
    {Set = (set <|> unset) >>= MVar.fill set
     Wait = set >>=? MVar.fill unset}
  new () = AutoResetEvent (false)

from hopac.

polytypic avatar polytypic commented on June 14, 2024

One should not block Hopac worker threads or equivalently one should also not use a Hopac job to run a thread that never waits. Such operations will interfere with the scheduling mechanisms of Hopac.

If you just need to run some simple CPU bound synchronous computation that takes a relatively long time, but will ultimately finish, then you may be just fine synchronously calling that computation from within a Hopac job. You may be fine even if that computation does some IO.

For more involved synchronous operations that take long enough time that they interfere with responsiveness or spend a lot of time waiting for IO you will need to do something more clever. Job.lift is not safe in this respect.

Note that such involved, non-cooperative, synchronous operations don't really work nicely from within asyncs or Tasks either. Usually async and Task use ThreadPool underneath and what happens is that more worker threads will be created which is very expensive. Ultimately it would be better to rewrite CPU bound operations to take advantage of parallelism and IO bound operations to take advantage of non-blocking asynchronous IO.

If you really need to interface with such non-cooperative legacy APIs, then you could just as well use the legacy helper for that: ThreadPool.

Here is how one could wrap a WaitHandle as a job:

type WaitHandle with
  member this.awaitJob (timeout: TimeSpan) : Job<bool> =
    Job.scheduler () >>= fun sr ->
    let rV = ivar ()
    ThreadPool.RegisterWaitForSingleObject
     (this, (fun _ r -> Scheduler.start sr (rV <-= r)), null, timeout, true)
    upcast rV

Here is how one could wrap an expensive, non-cooperative, synchronous computation as a job:

type ThreadPool with
  static member runAsJob (op: unit -> 'x) : Job<'x> =
    Job.scheduler () >>= fun sr ->
    let rV = ivar ()
    ThreadPool.QueueUserWorkItem
     (fun _ -> Scheduler.start sr (try rV <-= op () with e -> IVar.fillFailure rV e))
    upcast rV

BTW, I'm currently in the process of rewriting/redesigning the scheduler mechanism of Hopac to allow for SynchronizationContext support. Once finished, the pattern you see above will change slightly.

from hopac.

dustinlacewell-wk avatar dustinlacewell-wk commented on June 14, 2024

Holy hell this thread was useful.

from hopac.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.