Pregunta

Siempre escuchas que el código funcional es inherentemente más fácil de paralelizar que el código no funcional, así que decidí escribir una función que haga lo siguiente:

Dada una entrada de cadenas, totalice el número de caracteres únicos para cada cadena. Entonces, dada la entrada [" aaaaa " ;; " bbb " ;; " ccccccc " ;; " abbbc " ] , nuestro método devolverá a: 6; b: 6; c: 8 .

Esto es lo que he escrito:

(* seq<#seq<char>> -> Map<char,int> *)
let wordFrequency input =
    input
    |> Seq.fold (fun acc text ->
        (* This inner loop can be processed on its own thread *)
        text
        |> Seq.choose (fun char -> if Char.IsLetter char then Some(char) else None)
        |> Seq.fold (fun (acc : Map<_,_>) item ->
            match acc.TryFind(item) with
            | Some(count) -> acc.Add(item, count + 1)
            | None -> acc.Add(item, 1))
            acc
        ) Map.empty

Este código es idealmente paralelizable, porque cada cadena en input se puede procesar en su propio hilo. No es tan sencillo como parece, ya que el bucle interno agrega elementos a un mapa compartido entre todas las entradas.

Me gustaría que el bucle interno fuera factorizado en su propio hilo, y no quiero usar ningún estado mutable. ¿Cómo volvería a escribir esta función utilizando un flujo de trabajo asíncrono?

¿Fue útil?

Solución

Como ya se señaló, existe una contención de actualización si intenta que diferentes subprocesos procesen diferentes cadenas de entrada, ya que cada subproceso puede incrementar el recuento de cada letra. Puede hacer que cada hilo produzca su propio Mapa, y luego 'sumar todos los Mapas', pero ese paso final puede ser costoso (y no es tan adecuado para utilizar hilos debido a los datos compartidos). Creo que es probable que las entradas grandes se ejecuten más rápido usando un algoritmo como el que se muestra a continuación, donde cada hilo procesa una letra a la cuenta diferente (para todas las cadenas en la entrada). Como resultado, cada subproceso tiene su propio contador independiente, por lo que no hay contención de actualización ni un paso final para combinar los resultados. Sin embargo, necesitamos un preprocesamiento para descubrir el 'conjunto de letras únicas', y este paso tiene el mismo problema de contención. (En la práctica, es probable que conozca el universo de los personajes por adelantado, por ejemplo, alfabéticos, y luego solo puede crear 26 hilos para procesar az, y evitar este problema). En cualquier caso, presumiblemente la pregunta es sobre explorar cómo escribir F # código asíncrono para dividir el trabajo entre subprocesos ', por lo que el código siguiente lo demuestra.

#light

let input = [| "aaaaa"; "bbb"; "ccccccc"; "abbbc" |]

// first discover all unique letters used
let Letters str = 
    str |> Seq.fold (fun set c -> Set.add c set) Set.empty 
let allLetters = 
    input |> Array.map (fun str -> 
        async { return Letters str })
    |> Async.Parallel 
    |> Async.Run     
    |> Set.union_all // note, this step is single-threaded, 
        // if input has many strings, can improve this

// Now count each letter on a separate thread
let CountLetter letter =
    let mutable count = 0
    for str in input do
        for c in str do
            if letter = c then
                count <- count + 1
    letter, count
let result = 
    allLetters |> Seq.map (fun c ->
        async { return CountLetter c })
    |> Async.Parallel 
    |> Async.Run

// print results
for letter,count in result do
    printfn "%c : %d" letter count

De hecho, he "cambiado completamente el algoritmo", principalmente porque el algoritmo original que tenía no es particularmente adecuado para la paralelización directa de datos debido a la contención de la actualización. Dependiendo de lo que aprendas, esta respuesta puede o no ser particularmente satisfactoria para ti.

Otros consejos

Puedes escribir eso así:

let wordFrequency =
  Seq.concat >> Seq.filter System.Char.IsLetter >> Seq.countBy id >> Map.ofSeq

y paralelécela con solo dos caracteres adicionales para usar el módulo PSeq del FSharp.PowerPack.Parallel.Seq DLL en lugar del ordinario Seq módulo:

let wordFrequency =
  Seq.concat >> PSeq.filter System.Char.IsLetter >> PSeq.countBy id >> Map.ofSeq

Por ejemplo, el tiempo que se tarda en calcular las frecuencias de la Biblia King James de 5.5Mb se reduce de 4.75s a 0.66s. Eso es un 7.2 & # 215; aceleración en esta máquina de 8 núcleos.

Paralelo no es lo mismo que async, como explica Don Syme .

Entonces, en IMO, sería mejor utilizar PLINQ para paralelizar.

No hablo F # del todo bien, pero puedo abordar esto. Piensa en usar map / reduce:

sea n = tarjeta (S) el número de símbolos s en el alfabeto S.

Etapa del mapa:

Procesos

Spawn n , donde la asignación del proceso i -th es para calcular el número de apariciones del símbolo s i en todo el vector de entrada.

Reducir etapa :

Reúna el total de cada uno de los procesos n en orden. Ese vector son tus resultados.

Ahora, esta versión no produce ninguna mejora con respecto a una versión en serie; Sospecho que hay una dependencia oculta aquí que hace que esto sea intrínsecamente difícil de paralelizar, pero estoy demasiado cansado y con el cerebro en suspenso como para probarlo esta noche.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top