Pergunta

Você sempre ouve que o código funcional é inerentemente mais fácil de paralelizar do que o código não funcional, então decidi escrever uma função que faz o seguinte:

Dada uma entrada de strings, totalize o número de caracteres exclusivos para cada string.Então, dada a entrada [ "aaaaa"; "bbb"; "ccccccc"; "abbbc" ], nosso método retornará a: 6; b: 6; c: 8.

Aqui está o que escrevi:

(* seq<#seq<char>> -> Map<char,int> *)
let wordFrequency input =
    input
    |> Seq.fold (fun acc text ->
        (* This inner loop can be processed on its own thread *)
        text
        |> Seq.choose (fun char -> if Char.IsLetter char then Some(char) else None)
        |> Seq.fold (fun (acc : Map<_,_>) item ->
            match acc.TryFind(item) with
            | Some(count) -> acc.Add(item, count + 1)
            | None -> acc.Add(item, 1))
            acc
        ) Map.empty

Este código é idealmente paralelizável, porque cada string em input pode ser processado em seu próprio thread.Não é tão simples quanto parece, pois o innerloop adiciona itens a um mapa compartilhado entre todas as entradas.

Gostaria que o loop interno fosse fatorado em seu próprio thread e não quero usar nenhum estado mutável. Como eu reescreveria esta função usando um fluxo de trabalho assíncrono?

Foi útil?

Solução

Como já foi apontado, há contenção de atualização se você tentar fazer com que threads diferentes processem strings de entrada diferentes, já que cada thread pode incrementar a contagem de cada letra.Você pode fazer com que cada thread produza seu próprio mapa e depois 'adicione todos os mapas', mas essa etapa final pode ser cara (e não é tão adequada para utilizar threads devido aos dados compartilhados).Acho que entradas grandes provavelmente serão executadas mais rapidamente usando um algoritmo como o abaixo, onde cada thread processa uma letra para contagem diferente (para todas as strings na entrada).Como resultado, cada thread tem seu próprio contador independente, portanto, não há contenção de atualização e nenhuma etapa final para combinar os resultados.No entanto, precisamos de pré-processamento para descobrir o “conjunto de letras únicas”, e esta etapa apresenta o mesmo problema de contenção.(Na prática, você provavelmente conhece o universo dos personagens de antemão, por ex.alfabética e, em seguida, pode apenas criar 26 threads para processar a-z e contornar esse problema.) Em qualquer caso, presumivelmente, a questão é principalmente sobre explorar 'como escrever código assíncrono F # para dividir o trabalho entre threads', então o código abaixo demonstra isso .

#light

let input = [| "aaaaa"; "bbb"; "ccccccc"; "abbbc" |]

// first discover all unique letters used
let Letters str = 
    str |> Seq.fold (fun set c -> Set.add c set) Set.empty 
let allLetters = 
    input |> Array.map (fun str -> 
        async { return Letters str })
    |> Async.Parallel 
    |> Async.Run     
    |> Set.union_all // note, this step is single-threaded, 
        // if input has many strings, can improve this

// Now count each letter on a separate thread
let CountLetter letter =
    let mutable count = 0
    for str in input do
        for c in str do
            if letter = c then
                count <- count + 1
    letter, count
let result = 
    allLetters |> Seq.map (fun c ->
        async { return CountLetter c })
    |> Async.Parallel 
    |> Async.Run

// print results
for letter,count in result do
    printfn "%c : %d" letter count

Na verdade, 'alterei completamente o algoritmo', principalmente porque o algoritmo original que você tinha não é particularmente adequado para direcionar a paralelização de dados devido à contenção de atualização.Dependendo exatamente do que você deseja aprender, esta resposta pode ou não ser particularmente satisfatória para você.

Outras dicas

Você pode escrever assim:

let wordFrequency =
  Seq.concat >> Seq.filter System.Char.IsLetter >> Seq.countBy id >> Map.ofSeq

e paralelizá-lo com apenas dois caracteres extras para usar o PSeq módulo do FSharp.PowerPack.Parallel.Seq DLL em vez do comum Seq módulo:

let wordFrequency =
  Seq.concat >> PSeq.filter System.Char.IsLetter >> PSeq.countBy id >> Map.ofSeq

Por exemplo, o tempo necessário para calcular as frequências da Bíblia King James de 5,5 Mb cai de 4,75s para 0,66s.Isso é uma aceleração de 7,2× nesta máquina de 8 núcleos.

Paralelo não é o mesmo que assíncrono, pois Don Syme explica.

Então, na IMO, seria melhor usar o PLINQ para paralelizar.

Eu não falo F# muito bem, mas posso resolver isso.Pense em usar map/reduce:

deixar n = cartão (Σ) seja o número de símbolos σ no alfabeto Σ.

Estágio do mapa:

Gerar n processos, onde a atribuição do eu-ésimo processo é contabilizar o número de ocorrências do símbolo σeu em todo o vetor de entrada.

Reduzir estágio:

Reúna o total de cada um dos n processos em ordem.Esse vetor são seus resultados.

Agora, esta versão não resulta em nenhuma melhoria em relação à versão serial;Suspeito que haja uma dependência oculta aqui que torna isso inerentemente difícil de paralelizar, mas estou muito cansado e com morte cerebral para provar isso esta noite.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top