Question

I have a question concerning data structures that contain async operations. It may sound weird.

TestActor contains a MailBoxProcessor and has three functions: Receive prepares the mailbox processor to receive messages Post and PostAndAsyncReply are used to send messages to the actor.

type TestActor (init, timeout) =
    let mutable counter = init
    let rcvFun = fun (msg) -> async {
            match msg with
            | Add i -> 
                counter <- counter + i
            | GetCounter reply -> 
                reply.Reply counter}
    do printfn "Initializing actors: "
    do mailbox.Receive (rcvFun, timeout) ////// RECEIVE IS CALLED AT CONSTRUCTION

    let mailbox = OnlyLatestMBP<TestMessage> ()

    member x.Receive (timeout) =       
        mailbox.Receive (rcvFun, timeout) 

    member x.Post (msg: TestMessage, timeout) = 
        mailbox.Post(msg, timeout)

    member x.PostAndAsyncReply (replyChannel, timeout) = 
        mailbox.PostAndAsyncReply(replyChannel, timeout)

I'd like to use this example to understand an issue that affected my code. In the usual example for stacking agents in a data structure, Receive is executed at construction. In my example, the agent could be tested test with the code below:

let actorsWorkforce = 
    seq { 1 .. 5}
    |> Seq.map (fun idx -> TestActor(idx, 60000))

let test = 
    actorsWorkforce 
    |> Seq.map ( fun idx -> idx.PostAndAsyncReply ( (fun reply -> GetCounter reply), 10000) )
    |> Async.Parallel
    |> Async.RunSynchronously 


let result = 
    test 
    |> Array.iteri (fun idx element ->
        match element with
        | Some x -> printfn "Actor %i: OK with result %A" idx x
        | None -> printfn "Actor %i: Failed" idx )

And this works as planned.

However, let's say I'd like to postpone the call to Receive to a later stage.

type TestActor (init) = 
    let mutable counter = init
    let rcvFun = fun (msg) -> async {
            match msg with
            | Add i -> 
                counter <- counter + i
            | GetCounter reply -> 
                reply.Reply counter}

    let mailbox = OnlyLatestMBP<TestMessage> ()

    member x.Receive (timeout) =       
        mailbox.Receive (rcvFun, timeout) 

    member x.Post (msg: TestMessage, timeout) = 
        mailbox.Post(msg, timeout)

    member x.PostAndAsyncReply (replyChannel, timeout) = 
        mailbox.PostAndAsyncReply(replyChannel, timeout)


let actorsWorkforce = 
    seq { 1 .. 5}
    |> Seq.map (fun idx -> TestActor(idx))

actorsWorkforce |> Seq.iter (fun idx -> idx.Receive (60000))

let test = 
    actorsWorkforce 
    |> Seq.map ( fun idx -> idx.PostAndAsyncReply ( (fun reply -> GetCounter reply), 10000) )

    |> Async.Parallel
    |> Async.RunSynchronously 


let result = 
    test 
    |> Array.iteri (fun idx element ->
        match element with
        | Some x -> printfn "Actor %i: OK with result %A" idx x
        | None -> printfn "Actor %i: Failed" idx )

This piece of code compiles but does not work. mailbox.Receive has the type signature member Receive : callback:('a -> Async<unit>) * ?timeout:int -> unit so it would make sense to execute Receive with Seq.iter. I suspect that the code does not work because actorsWorkforce |> Seq.iter (fun idx -> idx.Receive (60000)) duplicates actorsWorkforce when executed.

Is this correct? How can I fix this? Thanks!

EDIT

The entire code:

open System
open System.Diagnostics
open Microsoft.FSharp.Control
open System.Threading
open System.Threading.Tasks
open System.Collections.Concurrent


////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// OnlyLatest
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

type Envelope<'a> = Option<DateTime * 'a> 

[<Sealed>]
type AsyncReplyChannel<'Reply>(replyf : 'Reply -> unit) =
    member x.Reply(reply) = replyf(reply)


[<Sealed>]
type OnlyLatestMBP<'a> () =

    let mutable currentEnvelope: Envelope<'a> = Envelope<'a>.None
    let mutable timestampLastPrcsd: DateTime = DateTime.Now
    let mutable react = Unchecked.defaultof<_>

    // Msg Box status
    let mutable isActive = false
    let mutable defaultTimeout = Timeout.Infinite

    // Event Messages
    let awaitMsg = new AutoResetEvent(false)
    let isActiveMsg = new AutoResetEvent(false) 

    let rec await timeout = async {
        let thr = Thread.CurrentThread.ManagedThreadId
        printfn "await on thread %i" thr
        match currentEnvelope with
        | Some (timestamp, x) -> 
            if timestamp > timestampLastPrcsd then
                do! react x
                timestampLastPrcsd <- timestamp               
                printfn "processed message"
            currentEnvelope <- Envelope.None
            awaitMsg.Reset() |> ignore
            return! await timeout
        | None -> 
            let! recd = Async.AwaitWaitHandle(awaitMsg, timeout)
            if recd
            then return! await timeout    
            else  
                isActive <- false
                isActiveMsg.Reset() |> ignore
                printfn ".. no message within timeout, shutting down"  }

    member x.DefaultTimeout
        with get() = defaultTimeout
        and set(value) = defaultTimeout <- value

    member x.Receive (callback, ?timeout) = 
        if not isActive then 
            isActive <- true
            isActiveMsg.Set() |> ignore
        let timeout = defaultArg timeout defaultTimeout  
        react <- callback 
        let todo = await timeout
        Async.Start todo  

    member x.Post (msg, ?timeout) = async {
        let thr = Thread.CurrentThread.ManagedThreadId
        printfn "posting on thread %i" thr
        let timeout = defaultArg timeout defaultTimeout
        if not isActive then 
            let! recd = Async.AwaitWaitHandle(isActiveMsg, timeout)
            if recd then 
                currentEnvelope <- Envelope.Some(DateTime.Now, msg)
                awaitMsg.Set() |> ignore       
                return true
            else return false
        else             
            currentEnvelope <- Envelope.Some(DateTime.Now, msg) 
            awaitMsg.Set() |> ignore  
            return true  }

    member x.PostAndAsyncReply (replyChannelMsg, ?timeout) = async {
        let timeout = defaultArg timeout defaultTimeout
        let tcs = new TaskCompletionSource<_>()  
        let msg = replyChannelMsg ( new AsyncReplyChannel<_> (fun reply -> tcs.SetResult(reply)) )
        let! posted = x.Post (msg,timeout)
        if posted then
            match timeout with
            | Timeout.Infinite -> 
                let! result = Async.FromContinuations ( fun (cont, _, _) ->
                    let apply = fun (task: Task<_>) -> cont (task.Result)
                    tcs.Task.ContinueWith(apply) |> ignore )
                return Some result 
            | _ ->
                let waithandle = tcs.Task.Wait(timeout)
                match waithandle with
                | false -> return None
                | true -> return Some tcs.Task.Result 
        else return None }



type TestMessage =
   | Add of int
   | GetCounter of AsyncReplyChannel<int>


type TestActor (init) =
    let mutable counter = init
    let rcvFun = fun (msg) -> async {
            match msg with
            | Add i -> 
                counter <- counter + i
            | GetCounter reply -> 
                reply.Reply counter}


    let mailbox = OnlyLatestMBP<TestMessage> ()
//    do printfn "Initializing actors: "
//    do mailbox.Receive (rcvFun, timeout)

    member x.Receive (timeout) =       
        mailbox.Receive (rcvFun, timeout) 

    member x.Post (msg: TestMessage, timeout) = 
        mailbox.Post(msg, timeout)

    member x.PostAndAsyncReply (replyChannel, timeout) = 
        mailbox.PostAndAsyncReply(replyChannel, timeout)




let actorsWorkforce = 
    seq { 1 .. 5}
    |> Seq.map (fun idx -> TestActor(idx))


actorsWorkforce |> Seq.iter (fun actor -> actor.Receive (60000)) 


let test = 
    actorsWorkforce 
    |> Seq.map ( fun idx -> idx.PostAndAsyncReply ( (fun reply -> GetCounter reply), 10000) )
    |> Async.Parallel
    |> Async.RunSynchronously 


let result = 
    test 
    |> Array.iteri (fun idx element ->
        match element with
        | Some x -> printfn "Actor %i: OK with result %A" idx x
        | None -> printfn "Actor %i: Failed" idx )
Was it helpful?

Solution

As initially suspected, the issue was indeed with: actorsWorkforce |> Seq.iter (fun idx -> idx.Receive (60000))

The problem was due to the lazy nature of seq

I have produced a narrowed down minimal code example.

open System
open System.Diagnostics
open Microsoft.FSharp.Control
open System.Threading
open System.Threading.Tasks
open System.Collections.Concurrent

type TestActress (name, timerlength) = 
    let mutable isActive = false
    let rec myTask () = async {
        Thread.Sleep (timerlength * 1000)
        printfn "%s waited : %i" name timerlength
        return! myTask () }
    member this.Start () = 
        isActive <- true
        Async.Start (myTask ())
    member this.GetStatus () = async {
        Thread.Sleep (2000)
        return  isActive }

// One single element, this is easy
let cameronDiaz = TestActress ("Cameron", 10)
cameronDiaz.Start ()
let status = cameronDiaz.GetStatus () |> Async.RunSynchronously     

// Async.Parallel receives a seq<Async<'T>> as an input
// This is why I started off with a seq
// This piece of code does not work
let hollywood = 
    [ "Cameron"; "Pamela"; "Natalie"; "Diane" ] 
    |> List.toSeq
    |> Seq.mapi ( fun idx el -> TestActress (el, idx + 10) )
hollywood |> Seq.iter ( fun el -> el.Start () ) 
let areTheyWorking =
    hollywood
    |> Seq.map (fun el -> el.GetStatus ()) 
    |> Async.Parallel
    |> Async.RunSynchronously

// Allright, with a list I get the function executed when I expect them to
let hollywood2 = 
    [ "Cameron"; "Pamela"; "Natalie"; "Diane" ] 
    |> List.mapi ( fun idx el -> TestActress (el, idx + 10) )
hollywood2 |> List.iter ( fun el -> el.Start () ) 
let areTheyWorking2 =
    hollywood2
    |> List.map (fun el -> el.GetStatus ()) 
    |> Async.Parallel
    |> Async.RunSynchronously
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top