Code Monkey home page Code Monkey logo

Comments (5)

polytypic avatar polytypic commented on June 3, 2024

The problem is essentially here:

get() <|>? (dopChanged >>.? get())

The way guarded and wrapped alternatives work in Hopac is that within such an alternative there is a single alternative that is the commit point. Look for the term "commit point" in the CML book (index).

In the above, on the right hand side, the dopChanged alternative is the commit point. When the above alternative is combined with the other alternative using choose and then at some point the dopChanged alternative is committed to, the job returned by get () (rightmost above) becomes the entire continuation of the server, and that continuation is never, which, when it is the only alternative, as is the case when an alternative is used as a job, is equivalent to abort, which means that the server thread aborts (commits suicide).

The way to structure Hopac servers is like this:

let rec loop state =
  match state with
   | variant1 -> choose [op11 ... op1M_1]
   | ...
   | variantN -> choose [opN1 ... opNM_n]

In other words, based entirely on the internal state of the server, decide which synchronous operations should be offered and then synchronize on those.

Here is a somewhat simplified pool design, which I've structured fairly explicitly according to the above template:

type Pool<'i, 'o>(degree: int, source: Alt<'i>, worker: 'i -> Job<'o>) =
  let setDegree = ch ()
  let workDone = ch ()

  let rec loop degree usage =
    let setDegreeAlt () =
      setDegree >>=? fun degree ->
        loop degree usage
    let workDoneAlt () =
      workDone >>=? fun result ->
        printfn "Work done: %A" result
        loop degree (usage - 1)
    let sourceAlt () =
      source >>=? fun input ->
        Job.delayWith worker input
        |> Job.catch
        >>= fun r -> workDone <-- r
        |> Job.queue >>= fun () ->
        loop degree (usage + 1)
    if usage < degree
    then setDegreeAlt () <|>? workDoneAlt () <|>? sourceAlt ()
    else setDegreeAlt () <|>? workDoneAlt ()

  do loop degree 0 |> server

  member this.SetDegree degree =
    setDegree <-- degree

Note that while the above catches exceptions raised by a misbehaving worker, the above does not guard against a misbehaving source.

Using never one could also write an equivalent loop like this:

  let rec loop degree usage =
    (setDegree >>=? fun degree -> loop degree usage) <|>?
    (workDone >>=? fun result ->
       printfn "Work done: %A" result
       loop degree (usage - 1)) <|>?
    (if usage < degree then
       source >>=? fun input ->
         Job.delayWith worker input
         |> Job.catch
         >>= fun r -> workDone <-- r
         |> Job.queue >>= fun () ->
         loop degree (usage + 1)
     else
       Alt.never ())

Yet another way to write an equivalent loop would be using choose and a list construction:

  let rec loop degree usage =
    (setDegree >>=? fun degree -> loop degree usage) ::
    (workDone >>=? fun result ->
       printfn "Work done: %A" result
       loop degree (usage - 1)) ::
    (if usage < degree then
      [source >>=? fun input ->
         Job.delayWith worker input
         |> Job.catch
         >>= fun r -> workDone <-- r
         |> Job.queue >>= fun () ->
         loop degree (usage + 1)]
     else
       [])
    |> Alt.choose

from hopac.

vasily-kirichenko avatar vasily-kirichenko commented on June 3, 2024

Thanks!

However, I changed how errors are handled: you just enqueue a new Job if an error occurs which may results with infinite looping on a malicious message if degree = 1. So, I returned my failedMessages mailbox back and pull messages from it if source is empty:

type Pool<'msg, 'res, 'error>(degree: int, source: Alt<'msg>, worker: 'msg -> Job<Choice<'res, 'msg * 'error>>) =
    let setDegree = ch<int>() 
    let workDone = ch<Choice<'res, 'msg * 'error>>()
    let failedMessages = mb()

    let pool = Job.iterateServer (degree, 0)  <| fun (degree, usage) ->
        (setDegree |>>? fun dop -> (dop, usage)) <|>? 
        (workDone |>>? fun _ -> degree, usage - 1) <|>?
        (if usage < degree then
            (source <|>? failedMessages) >>=? fun x -> 
            Job.delayWith worker x >>= (fun r ->
                match r with
                | Choice1Of2 _ -> Job.result r
                | Choice2Of2 (msg, _) -> failedMessages <<-+ msg >>% r) >>= fun r ->
                workDone <-- r
            |> Job.queue
            >>% (degree, usage + 1)
         else 
            Alt.never()) 
    do start pool
    member __.SetDegree value = setDegree <-+ value |> run

Actually, I want Hopac to choose not deterministically on source <|>? failedMessages (AFAIK Hopac unlike CML always chooses source). Is it possible? The idea is to mix failed messages with normal ones.

from hopac.

polytypic avatar polytypic commented on June 3, 2024

Is there some reason why source needs to be an alternative? (I do see that it can be useful in a variety of ways, but is that an essential feature?) If it could be a channel, then rather than having a separate mailbox for failed messages, you could send failed messages to the source channel. This way the failed messages and messages from outside would be naturally FIFO queued.

from hopac.

polytypic avatar polytypic commented on June 3, 2024

Actually, I want Hopac to choose not deterministically on source <|>? failedMessages (AFAIK Hopac unlike CML always chooses source). Is it possible? The idea is to mix failed messages with normal ones.

In this case, when synchronizing on source <|>? failedMessages and both source and failedMessages are immediately available, then Hopac will commit to source. In all other cases Hopac behaves essentially like CML.

You can avoid the bias of lhs <|>? rhs by randomly choosing the order in which the arguments are considered. Basically like this:

let (<~>?) lhs rhs = Alt.guard << Job.delay <| fun () ->
  if flipCoin ()
  then lhs <|>? rhs
  else rhs <|>? lhs

One could similarly write a combinator on top of choose to randomize the order of the alternatives. These can be done more efficiently inside the library, so I added <~>? and chooser to Hopac. They are still slower than <|>? and choose, of course.

Note that these combinators, <~>? and chooser, do not lead to the exact same properties that you would get with CML. In fact, I'm not aware of any paper that would detail, or prove correct, the exact algorithms used in any version of CML to ensure fairness and/or non-determinism.

As noted in the documentation of <~>?, simplistic combinations of randomized choices do not necessarily lead to uniform distributions. That is something that needs to be understood by programmmers using these combinators.

BTW, in this particular case, for example, I think that neither the randomization approach supported by <~>? nor the approaches used in CML necessarily lead to ideal fairness properties. Let's say that you indeed have a misbehaving job or jobs. Do you really want to choose with equal (or any fixed) probability of taking either a new job or a failed job? This could lead to a situation where a fixed number of misbehaving jobs could effectively slow down the processing of other jobs so that the number of jobs waiting on the source channel would grow without bound.

Furthermore, I'm also suspicious of trying to guard against misbehaving jobs at this level. Threads within a process should be working co-operatively rather than competitively. I think this is fundamentally different when compared to scheduling processes at the operating system level, where OS processes are competing for the CPU. At the level of a single process or application, the people programming the application are in charge and can adjust the algorithms and scheduling to make the program behave. At the OS level there is no choice but to force individual programs to behave.

from hopac.

vasily-kirichenko avatar vasily-kirichenko commented on June 3, 2024

Thanks for the new combinator!

Do you really want to choose with equal (or any fixed) probability of taking either a new job or a failed job? This could lead to a situation where a fixed number of misbehaving jobs could effectively slow down the processing of other jobs so that the number of jobs waiting on the source channel would grow without bound.

If both source and failedMessages are not empty, then workers are divided equally between them, which means normal messages are processed by half of configured via degree jobs. I agree this logic is too simplistic.

Actually, this pool's degree should be adjusted dynamically (from outside or inside) depending on fraction of failed jobs and their types of failure. To be concrete, it's intended to call SQL stored procedures (SPs) and an SP can 1. execute normally 2. throw some fatal exception (constraints violation somewhere in the DB or the like) or 3. throw a recoverable exception (like a deadlock). So, in case of a fatal exception the job should not be re-run at all, but in case of a recoverable one it should be run again and again (with some timeout between attempts which, I hope, is provided by failedMessages queue existence) until it succeeds. Pool should look at percent of "deadlock" errors and adjust its degree accordingly, with some smart algorithm (because deadlocks are typically caused by too many concurrently executed SPs).

So, the new "nondeterministic" operator is nice to pick normal and failing messages with close to equal distribution.

from hopac.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.