Cómo usar tryscan en f# correctamente
-
28-10-2019 - |
Pregunta
Estaba tratando de encontrar un ejemplo sobre cómo usar TryScan
, pero no he encontrado ninguno, ¿podrías ayudarme?
Lo que me gustaría hacer (ejemplo bastante simplificado): tengo un MailboxProcessor
Eso acepta dos tipos de mesagos.
El primero
GetState
Devuelve el estado actual.GetState
Los mensajes se envían con bastante frecuenciaEl otro
UpdateState
es muy costoso (lleva mucho tiempo), por ejemplo, descargar algo de Internet y luego actualiza el estado en consecuencia.UpdateState
se llama solo raramente.
Mi problema es - mensajes GetState
están bloqueados y esperan hasta que UpdateState
son servidos. Por eso intenté usar TryScan
para procesar todo GetState
mensajes, pero sin suerte.
Mi código de ejemplo:
type Msg = GetState of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
let rec loop state = async {
// this TryScan doesn't work as expected
// it should process GetState messages and then continue
mbox.TryScan(fun m ->
match m with
| GetState(chnl) ->
printfn "G processing TryScan"
chnl.Reply(state)
Some(async { return! loop state})
| _ -> None
) |> ignore
let! msg = mbox.Receive()
match msg with
| UpdateState ->
printfn "U processing"
// something very time consuming here...
async { do! Async.Sleep(1000) } |> Async.RunSynchronously
return! loop (state+1)
| GetState(chnl) ->
printfn "G processing"
chnl.Reply(state)
return! loop state
}
loop 0
)
[async { for i in 1..10 do
printfn " U"
mbox.Post(UpdateState)
async { do! Async.Sleep(200) } |> Async.RunSynchronously
};
async { // wait some time so that several `UpdateState` messages are fired
async { do! Async.Sleep(500) } |> Async.RunSynchronously
for i in 1..20 do
printfn "G"
printfn "%d" (mbox.PostAndReply(GetState))
}] |> Async.Parallel |> Async.RunSynchronously
Si intenta ejecutar el código, verá que GetState
El mensaje no se procesa casi, porque espera el resultado. Por otra parte UpdateState
es solo fuego y olvido, bloqueando así efectivamente el estado.
Editar
La solución actual que funciona para mí es esta:
type Msg = GetState of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
let rec loop state = async {
// this TryScan doesn't work as expected
// it should process GetState messages and then continue
let! res = mbox.TryScan((function
| GetState(chnl) -> Some(async {
chnl.Reply(state)
return state
})
| _ -> None
), 5)
match res with
| None ->
let! msg = mbox.Receive()
match msg with
| UpdateState ->
async { do! Async.Sleep(1000) } |> Async.RunSynchronously
return! loop (state+1)
| _ -> return! loop state
| Some n -> return! loop n
}
loop 0
)
Reacciones a los comentarios: la idea con otros MailboxProcessor
o ThreadPool
que se ejecuta UpdateState
En paralelo es genial, pero no lo necesito actualmente. Todo lo que quería hacer era procesar todo GetState
mensajes y después de eso los otros. No me importa eso durante el procesamiento UpdateState
El agente está bloqueado.
Te mostraré cuál fue el problema en la salida:
// GetState messages are delayed 500 ms - see do! Async.Sleep(500)
// each UpdateState is sent after 200ms
// each GetState is sent immediatelly! (not real example, but illustrates the problem)
U 200ms <-- issue UpdateState
U processing <-- process UpdateState, it takes 1sec, so other
U 200ms 5 requests are sent; sent means, that it is
U 200ms fire-and-forget message - it doesn't wait for any result
and therefore it can send every 200ms one UpdateState message
G <-- first GetState sent, but waiting for reply - so all
previous UpdateState messages have to be processed! = 3 seconds
and AFTER all the UpdateState messages are processed, result
is returned and new GetState can be sent.
U 200ms
U 200ms because each UpdateState takes 1 second
U 200ms
U processing
U
U
U
U
U processing
G processing <-- now first GetState is processed! so late? uh..
U processing <-- takes 1sec
3
G
U processing <-- takes 1sec
U processing <-- takes 1sec
U processing <-- takes 1sec
U processing <-- takes 1sec
U processing <-- takes 1sec
U processing <-- takes 1sec
G processing <-- after MANY seconds, second GetState is processed!
10
G
G processing
// from this line, only GetState are issued and processed, because
// there is no UpdateState message in the queue, neither it is sent
Solución
No creo que el TryScan
El método lo ayudará en este escenario. Le permite especificar el tiempo de espera para usar mientras espera mensajes. Una vez que se recibe un mensaje, comenzará a procesar el mensaje (ignorando el tiempo de espera).
Por ejemplo, si desea esperar un mensaje específico, pero realice alguna otra verificación cada segundo (mientras espera) podría escribir:
let loop () = async {
let! res = mbox.TryScan(function
| ImportantMessage -> Some(async {
// process message
return 0
})
| _ -> None)
match res with
| None ->
// perform some check & continue waiting
return! loop ()
| Some n ->
// ImportantMessage was received and processed
}
¿Qué puede hacer para evitar bloquear el procesador de buzón al procesar el UpdateState
¿mensaje? El procesador de buzón es (lógicamente) de un solo hilo; probablemente no desee cancelar el procesamiento de UpdateState
Mensaje, por lo que la mejor opción es comenzar a procesarlo en segundo plano y esperar hasta que se complete el procesamiento. El código que procesa UpdateState
Luego puede enviar algún mensaje al buzón (por ejemplo, UpdateStateCompleted
).
Aquí hay un boceto como podría verse esto:
let rec loop (state) = async {
let! msg = mbox.Receive()
match msg with
| GetState(repl) ->
repl.Reply(state)
return! scanning state
| UpdateState ->
async {
// complex calculation (runs in parallel)
mbox.Post(UpdateStateCompleted newState) }
|> Async.Start
| UpdateStateCompleted newState ->
// Received new state from background workflow
return! loop newState }
Ahora que la tarea de fondo se está ejecutando en paralelo, debe tener cuidado con el estado mutable. Además, si envías UpdateState
Mensajes más rápido de lo que puede procesarlos, estará en problemas. Esto se puede solucionar, por ejemplo, ignorando o en cola las solicitudes cuando ya está procesando una anterior.
Otros consejos
¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡
Desafortunadamente, el TryScan
La función en la versión actual de F# se rompe de dos maneras. En primer lugar, el objetivo es especificar un tiempo de espera, pero la implementación en realidad no lo honra. Específicamente, los mensajes irrelevantes restablecen el temporizador. En segundo lugar, como con el otro Scan
Función, la cola de mensajes se examina bajo un bloqueo que evita que cualquier otro subproceso publique durante la duración del escaneo, lo que puede ser un tiempo arbitrariamente largo. En consecuencia, el TryScan
La función en sí misma tiende a bloquear los sistemas concurrentes e incluso puede introducir puntos muertos porque el código de la persona que llama se evalúa dentro del bloqueo (por ejemplo, la publicación del argumento de la función a Scan
o TryScan
puede encerrar el agente cuando el código debajo de los bloqueos de bloqueo que esperan para adquirir el bloqueo que ya está bajo).
solía TryScan
En un prototipo temprano de mi código de producción y no causó un fin de los problemas. Sin embargo, logré arquitecto a su alrededor y la arquitectura resultante fue realmente mejor. En esencia, yo ansiosamente Receive
Todos los mensajes y filtrados usando mi propia cola local.
Como Tomás mencionó que el procesador de buzón es un solo enhebrado. Necesitará otro procesador de buzón para ejecutar las actualizaciones en un hilo separado del estatal Getter.
#nowarn "40"
type Msg =
| GetState of AsyncReplyChannel<int>
| UpdateState
let runner_UpdateState = MailboxProcessor.Start(fun mbox ->
let rec loop = async {
let! state = mbox.Receive()
printfn "U start processing %d" !state
// something very time consuming here...
do! Async.Sleep 100
printfn "U done processing %d" !state
state := !state + 1
do! loop
}
loop
)
let mbox = MailboxProcessor.Start(fun mbox ->
// we need a mutiple state if another thread can change it at any time
let state = ref 0
let rec loop = async {
let! msg = mbox.Receive()
match msg with
| UpdateState -> runner_UpdateState.Post state
| GetState chnl -> chnl.Reply !state
return! loop
}
loop)
[
async {
for i in 1..10 do
mbox.Post UpdateState
do! Async.Sleep 200
};
async {
// wait some time so that several `UpdateState` messages are fired
do! Async.Sleep 1000
for i in 1..20 do
printfn "G %d" (mbox.PostAndReply GetState)
do! Async.Sleep 50
}
]
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
System.Console.ReadLine() |> ignore
producción:
U start processing 0
U done processing 0
U start processing 1
U done processing 1
U start processing 2
U done processing 2
U start processing 3
U done processing 3
U start processing 4
U done processing 4
G 5
U start processing 5
G 5
U done processing 5
G 5
G 6
U start processing 6
G 6
G 6
U done processing 6
G 7
U start processing 7
G 7
G 7
U done processing 7
G 8
G U start processing 8
8
G 8
U done processing 8
G 9
G 9
U start processing 9
G 9
U done processing 9
G 9
G 10
G 10
G 10
G 10
También puedes usar Threadpool.
open System.Threading
type Msg =
| GetState of AsyncReplyChannel<int>
| SetState of int
| UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
let rec loop state = async {
let! msg = mbox.Receive()
match msg with
| UpdateState ->
ThreadPool.QueueUserWorkItem((fun obj ->
let state = obj :?> int
printfn "U start processing %d" state
Async.Sleep 100 |> Async.RunSynchronously
printfn "U done processing %d" state
mbox.Post(SetState(state + 1))
), state)
|> ignore
| GetState chnl ->
chnl.Reply state
| SetState newState ->
return! loop newState
return! loop state
}
loop 0)
[
async {
for i in 1..10 do
mbox.Post UpdateState
do! Async.Sleep 200
};
async {
// wait some time so that several `UpdateState` messages are fired
do! Async.Sleep 1000
for i in 1..20 do
printfn "G %d" (mbox.PostAndReply GetState)
do! Async.Sleep 50
}
]
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
System.console.readline () |> ignorar