Question

I want to implement the concept of a Port of the CCR Framework in F# (as CCR is not officially supported for .Net 4.0). I know that one can use the MailboxProcessor class in F# to do this. This works perfectly for simple Receive Arbiters but I need the concept of the Interleave Arbiter, i.e. I want to control which messages are processed exclusively and which are processed concurrently. So far I've got no idea to implement this in F# and I would be grateful for your help.

Was it helpful?

Solution

I'm not very familiar with CCR, but I'll try to answer - my understanding is that interleave arbiter behaves a bit like ReaderWriterLock. That is, you can specify some operations that can run in parallel (reads) and some operations that are exclusive (writes).

The following agent is one way to implement it (not tested, but type checks :-)). The agent exposes two operations that are intended for public use. The last one is internal:

type Message<'T> =
  | PerformReadOperation of ('T -> Async<unit>)
  | PerformWriteOperation of ('T -> Async<'T>)
  | ReadOperationCompleted
  • By sending the agent PerformReadOperation, you're giving it an operation that should be run (once) using the state and possibly in parallel with other read operations.

  • By sending the agent PerformWriteOperation, you're giving it an operation that calculates a new state and must be executed after all read operations complete. (If you were working with immutable state, that would make things simpler - you wouldn't have to wait until readers complete! But the implementation below implements the waiting).

The agent starts with some initial state:

let initial = // initial state

And the rest of the agent is implemented using two loops:

let interleaver = MailboxProcessor.Start(fun mbox ->

  // Asynchronously wait until all read operations complete
  let rec waitUntilReadsComplete reads = 
    if reads = 0 then async { return () }
    else mbox.Scan(fun msg ->
      match msg with
      | ReadOperationCompleted -> Some(waitUntilReadsComplete (reads - 1))
      | _ -> None)

  let rec readingLoop state reads = async {
    let! msg = mbox.Receive()
    match msg with
    | ReadOperationCompleted ->
        // Some read operation completed - decrement counter
        return! readingLoop state (reads - 1) 
    | PerformWriteOperation(op) ->
        do! waitUntilReadsComplete reads
        let! newState = op state
        return! readingLoop newState 0
    | PerformReadOperation(op) ->
        // Start the operation in background & increment counter
        async { do! op state
                mbox.Post(ReadOperationCompleted) }
        |> Async.Start
        return! readingLoop state (reads + 1) }
  readingLoop initial 0)

OTHER TIPS

Just to add on to Tomas suggested solution, in case you do not want to expose the "ReadOperationCompleted" message to the consumer of the mail box (as this message is internal and in current implementation can be sent by any consumer of the mail box) a separate mail box can be created inside the main mail box processor function which will accept two messages: ReadOperationCompleted and WaitForReadCompleted (this one will be used with PostAndAsyncReply by the main mail box) as the response to this message will only come when all the read operations are completed. Also the "read" count represented by "reads" will be moved to this new internal mail box as that state be encapsulated by this internal mail box.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top