Pregunta

Estaba tratando de encontrar un ejemplo sobre cómo usar TryScan, pero no he encontrado ninguno, ¿podrías ayudarme?

Lo que me gustaría hacer (ejemplo bastante simplificado): tengo un MailboxProcessor Eso acepta dos tipos de mesagos.

  • El primero GetState Devuelve el estado actual.GetState Los mensajes se envían con bastante frecuencia

  • El otro UpdateState es muy costoso (lleva mucho tiempo), por ejemplo, descargar algo de Internet y luego actualiza el estado en consecuencia.UpdateState se llama solo raramente.

Mi problema es - mensajes GetState están bloqueados y esperan hasta que UpdateState son servidos. Por eso intenté usar TryScan para procesar todo GetState mensajes, pero sin suerte.

Mi código de ejemplo:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                mbox.TryScan(fun m ->
                    match m with 
                    | GetState(chnl) -> 
                        printfn "G processing TryScan"
                        chnl.Reply(state)
                        Some(async { return! loop state})
                    | _ -> None
                ) |> ignore

                let! msg = mbox.Receive()
                match msg with
                | UpdateState ->
                    printfn "U processing"
                    // something very time consuming here...
                    async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                    return! loop (state+1)
                | GetState(chnl) ->
                    printfn "G processing"
                    chnl.Reply(state)
                    return! loop state
             }
             loop 0
)

[async { for i in 1..10 do 
          printfn " U"
          mbox.Post(UpdateState)
          async { do! Async.Sleep(200) } |> Async.RunSynchronously
};
async { // wait some time so that several `UpdateState` messages are fired
        async { do! Async.Sleep(500) } |> Async.RunSynchronously
        for i in 1..20 do 
          printfn "G"
          printfn "%d" (mbox.PostAndReply(GetState))
}] |> Async.Parallel |> Async.RunSynchronously

Si intenta ejecutar el código, verá que GetState El mensaje no se procesa casi, porque espera el resultado. Por otra parte UpdateState es solo fuego y olvido, bloqueando así efectivamente el estado.

Editar

La solución actual que funciona para mí es esta:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                let! res = mbox.TryScan((function
                    | GetState(chnl) -> Some(async {
                            chnl.Reply(state)
                            return state
                        })
                    | _ -> None
                ), 5)

                match res with
                | None ->
                    let! msg = mbox.Receive()
                    match msg with
                        | UpdateState ->
                            async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                            return! loop (state+1)
                        | _ -> return! loop state
                | Some n -> return! loop n
             }
             loop 0
)

Reacciones a los comentarios: la idea con otros MailboxProcessor o ThreadPool que se ejecuta UpdateState En paralelo es genial, pero no lo necesito actualmente. Todo lo que quería hacer era procesar todo GetState mensajes y después de eso los otros. No me importa eso durante el procesamiento UpdateState El agente está bloqueado.

Te mostraré cuál fue el problema en la salida:

// GetState messages are delayed 500 ms - see do! Async.Sleep(500)
// each UpdateState is sent after 200ms
// each GetState is sent immediatelly! (not real example, but illustrates the problem)
 U            200ms   <-- issue UpdateState
U processing          <-- process UpdateState, it takes 1sec, so other 
 U            200ms       5 requests are sent; sent means, that it is
 U            200ms       fire-and-forget message - it doesn't wait for any result
                          and therefore it can send every 200ms one UpdateState message
G                     <-- first GetState sent, but waiting for reply - so all 
                          previous UpdateState messages have to be processed! = 3 seconds
                          and AFTER all the UpdateState messages are processed, result
                          is returned and new GetState can be sent. 
 U            200ms
 U            200ms       because each UpdateState takes 1 second
 U            200ms
U processing
 U
 U
 U
 U
U processing
G processing          <-- now first GetState is processed! so late? uh..
U processing          <-- takes 1sec
3
G
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
G processing          <-- after MANY seconds, second GetState is processed!
10
G
G processing
// from this line, only GetState are issued and processed, because 
// there is no UpdateState message in the queue, neither it is sent
¿Fue útil?

Solución

No creo que el TryScan El método lo ayudará en este escenario. Le permite especificar el tiempo de espera para usar mientras espera mensajes. Una vez que se recibe un mensaje, comenzará a procesar el mensaje (ignorando el tiempo de espera).

Por ejemplo, si desea esperar un mensaje específico, pero realice alguna otra verificación cada segundo (mientras espera) podría escribir:

let loop () = async {
  let! res = mbox.TryScan(function
    | ImportantMessage -> Some(async { 
          // process message 
          return 0
        })
    | _ -> None)
  match res with
  | None -> 
       // perform some check & continue waiting
       return! loop ()
  | Some n -> 
       // ImportantMessage was received and processed 
}

¿Qué puede hacer para evitar bloquear el procesador de buzón al procesar el UpdateState ¿mensaje? El procesador de buzón es (lógicamente) de un solo hilo; probablemente no desee cancelar el procesamiento de UpdateState Mensaje, por lo que la mejor opción es comenzar a procesarlo en segundo plano y esperar hasta que se complete el procesamiento. El código que procesa UpdateState Luego puede enviar algún mensaje al buzón (por ejemplo, UpdateStateCompleted).

Aquí hay un boceto como podría verse esto:

let rec loop (state) = async {
  let! msg = mbox.Receive()
  match msg with
  | GetState(repl) -> 
      repl.Reply(state)
      return! scanning state
  | UpdateState -> 
      async { 
        // complex calculation (runs in parallel)
        mbox.Post(UpdateStateCompleted newState) }
      |> Async.Start
  | UpdateStateCompleted newState ->
      // Received new state from background workflow
      return! loop newState }

Ahora que la tarea de fondo se está ejecutando en paralelo, debe tener cuidado con el estado mutable. Además, si envías UpdateState Mensajes más rápido de lo que puede procesarlos, estará en problemas. Esto se puede solucionar, por ejemplo, ignorando o en cola las solicitudes cuando ya está procesando una anterior.

Otros consejos

¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡

Desafortunadamente, el TryScan La función en la versión actual de F# se rompe de dos maneras. En primer lugar, el objetivo es especificar un tiempo de espera, pero la implementación en realidad no lo honra. Específicamente, los mensajes irrelevantes restablecen el temporizador. En segundo lugar, como con el otro Scan Función, la cola de mensajes se examina bajo un bloqueo que evita que cualquier otro subproceso publique durante la duración del escaneo, lo que puede ser un tiempo arbitrariamente largo. En consecuencia, el TryScan La función en sí misma tiende a bloquear los sistemas concurrentes e incluso puede introducir puntos muertos porque el código de la persona que llama se evalúa dentro del bloqueo (por ejemplo, la publicación del argumento de la función a Scan o TryScan puede encerrar el agente cuando el código debajo de los bloqueos de bloqueo que esperan para adquirir el bloqueo que ya está bajo).

solía TryScan En un prototipo temprano de mi código de producción y no causó un fin de los problemas. Sin embargo, logré arquitecto a su alrededor y la arquitectura resultante fue realmente mejor. En esencia, yo ansiosamente Receive Todos los mensajes y filtrados usando mi propia cola local.

Como Tomás mencionó que el procesador de buzón es un solo enhebrado. Necesitará otro procesador de buzón para ejecutar las actualizaciones en un hilo separado del estatal Getter.

#nowarn "40"

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | UpdateState

let runner_UpdateState = MailboxProcessor.Start(fun mbox ->
    let rec loop = async {
        let! state = mbox.Receive()
        printfn "U start processing %d" !state
        // something very time consuming here...
        do! Async.Sleep 100
        printfn "U done processing %d" !state
        state := !state + 1
        do! loop
    }
    loop
)

let mbox = MailboxProcessor.Start(fun mbox ->
    // we need a mutiple state if another thread can change it at any time
    let state = ref 0

    let rec loop = async {
        let! msg = mbox.Receive()

        match msg with
        | UpdateState -> runner_UpdateState.Post state
        | GetState chnl -> chnl.Reply !state

        return! loop 
    }
    loop)

[
    async { 
        for i in 1..10 do 
            mbox.Post UpdateState
            do! Async.Sleep 200
    };
    async { 
        // wait some time so that several `UpdateState` messages are fired
        do! Async.Sleep 1000

        for i in 1..20 do 
            printfn "G %d" (mbox.PostAndReply GetState)
            do! Async.Sleep 50
    }
] 
|> Async.Parallel 
|> Async.RunSynchronously
|> ignore

System.Console.ReadLine() |> ignore

producción:

U start processing 0
U done processing 0
U start processing 1
U done processing 1
U start processing 2
U done processing 2
U start processing 3
U done processing 3
U start processing 4
U done processing 4
G 5
U start processing 5
G 5
U done processing 5
G 5
G 6
U start processing 6
G 6
G 6
U done processing 6
G 7
U start processing 7
G 7
G 7
U done processing 7
G 8
G U start processing 8
8
G 8
U done processing 8
G 9
G 9
U start processing 9
G 9
U done processing 9
G 9
G 10
G 10
G 10
G 10

También puedes usar Threadpool.

open System.Threading

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | SetState of int
    | UpdateState

let mbox = MailboxProcessor.Start(fun mbox ->
    let rec loop state = async {
        let! msg = mbox.Receive()

        match msg with
        | UpdateState -> 
            ThreadPool.QueueUserWorkItem((fun obj -> 
                let state = obj :?> int

                printfn "U start processing %d" state
                Async.Sleep 100 |> Async.RunSynchronously
                printfn "U done processing %d" state
                mbox.Post(SetState(state + 1))

                ), state)
            |> ignore
        | GetState chnl -> 
            chnl.Reply state
        | SetState newState ->
            return! loop newState
        return! loop state
    }
    loop 0)

[
    async { 
        for i in 1..10 do 
            mbox.Post UpdateState
            do! Async.Sleep 200
    };
    async { 
        // wait some time so that several `UpdateState` messages are fired
        do! Async.Sleep 1000

        for i in 1..20 do 
            printfn "G %d" (mbox.PostAndReply GetState)
            do! Async.Sleep 50
    }
] 
|> Async.Parallel 
|> Async.RunSynchronously
|> ignore

System.console.readline () |> ignorar

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top