Hopac, how to implement a more nuanced MailboxProcessor?

390 views Asked by At

I'm not sure if I should ask this here or the Hopac Wiki or if this is more like a design question, but here goes. I was recently researching F# agents to go with my Rx related coding and came across the following code How do I create a job queue using a MailboxProcessor? and thought that hey, isn't this something that's a perfect fit for Hopac?

Now I have a few question with regard on how to actually implement a "job queue" that can be started and paused in Hopac (as in job queue post).

  • Question: How to maintain a queue of Hopac.Jobs and have the ability to pause processing jobs from it if needed or perhaps also clear it? I could implement an explicit queue as in the job queue post), but that aside, a possible answer to this question interests me on its own too.
  • Then a question for more general advice: I plan to make thousands of these agents so that they take input, say, from a Rx stream or perhaps there's one stream per agent, do some processing and then forward the results to an asynchronous interface (most likely it returns a Task or Task<T>) that may take time to accomplish or fail (say, there's a DB behind the interface) in which case I may want to have a timeout and retry. Perhaps this could be better accomplished with Rx by queuing the Job results to output Rx stream, but the main points are Task based interface interaction, timeout and retry upon failure. Question: Would this be an OK scenario? I'm aimiming to a somewhat more realistic use case in a mixed .NET code code base (that is, C#, some VB.NET and F#), therefore opinions and implementation points would be appreciated.

I took a cue from the Post Mailbox benchmark sample and created the following sample (the real gist of the code is the JobProcessor)

//Install-Package Hopac 
open Hopac
open Hopac.Extensions
open Hopac.Infixes
open Hopac.Job.Infixes


[<CustomEquality; CustomComparison>]
type Message<'a> = 
    | Start
    | Stop
    | Pause
    | Job of ('a -> unit)

    override x.Equals(obj) = 
        match obj with
        | :? Message<'a> as fu -> 
            match x, fu with
            | Start, Start | Stop, Stop | Pause, Pause -> true
            | Job f1, Job f2 -> true
            | _, _ -> false
        | _ -> false

    override x.GetHashCode() = 
        match x with
        | Start -> 1
        | Stop -> 2
        | Pause -> 3
        | _ -> 4

    interface System.IComparable with
        member x.CompareTo yobj = 
            match yobj with
            | :? Message<'a> as y -> compare x y
            | _ -> invalidArg "yobj" "cannot compare value of different types"

    interface System.IComparable<Message<'a>> with
        member x.CompareTo(y) = compare x y

type JobProcessor() = 
    let mMb = mb()
    do 
        run <| job { 
                   do! Job.foreverServer (mMb |>> fun msg -> 
                                              //Should an explicit job queue be introduce here?
                                              match msg with
                                              | Start -> printfn "Start"
                                              | Stop ->  printfn "Stop"
                                              | Pause -> printfn "Pause"
                                              | Job(f) ->
                                                printfn "Job"
                                                f())
               }

    member x.QueueJob(f) =
        Mailbox.Global.send mMb f
        //mMb <<-+ Message.Start

[<EntryPoint>]
let main argv = 
    let jp = new JobProcessor()
    let jobFunc i () = (printfn "%i" i)
    jp.QueueJob(Job(jobFunc 100)) 
0

There are 0 answers