Question

Vous entendez toujours dire que le code fonctionnel est intrinsèquement plus facile à mettre en parallèle que le code non fonctionnel. J'ai donc décidé d'écrire une fonction qui effectue les opérations suivantes:

Étant donné l'entrée de chaînes, additionnez le nombre de caractères uniques pour chaque chaîne. Donc, étant donné l'entrée [& aaaaa " ;; "bbb"; "ccccccc"; "abbbc" ] , notre méthode retourne a: 6; b: 6; c: 8 .

Voici ce que j'ai écrit:

(* seq<#seq<char>> -> Map<char,int> *)
let wordFrequency input =
    input
    |> Seq.fold (fun acc text ->
        (* This inner loop can be processed on its own thread *)
        text
        |> Seq.choose (fun char -> if Char.IsLetter char then Some(char) else None)
        |> Seq.fold (fun (acc : Map<_,_>) item ->
            match acc.TryFind(item) with
            | Some(count) -> acc.Add(item, count + 1)
            | None -> acc.Add(item, 1))
            acc
        ) Map.empty

Ce code est idéalement parallélisable car chaque chaîne de entrée peut être traitée sur son propre thread. Ce n'est pas aussi simple qu'il y paraît, car la boucle intérieure ajoute des éléments à une carte partagée entre toutes les entrées.

Je voudrais que la boucle interne soit factorisée dans son propre thread, et je ne veux utiliser aucun état mutable. Comment pourrais-je réécrire cette fonction à l'aide d'un flux de travail asynchrone?

Était-ce utile?

La solution

Comme cela a déjà été souligné, il existe un conflit de mise à jour si vous essayez de faire en sorte que différents threads traitent différentes chaînes d'entrée, car chaque thread peut incrémenter le nombre de lettres. Vous pouvez demander à chaque fil de produire sa propre carte, puis «additionner toutes les cartes», mais cette dernière étape peut être coûteuse (et n'est pas aussi bien adaptée à l'utilisation de fils en raison des données partagées). Je pense que les entrées de grande taille sont susceptibles de fonctionner plus rapidement en utilisant un algorithme comme celui ci-dessous, dans lequel chaque thread traite un nombre de lettres différent (pour toutes les chaînes de l'entrée). En conséquence, chaque thread a son propre compteur indépendant, donc pas de conflit de mise à jour et pas d'étape finale pour combiner les résultats. Cependant, nous avons besoin d'un prétraitement pour découvrir le «jeu de lettres uniques», et cette étape pose le même problème de conflit. (En pratique, vous connaissez probablement l’univers des caractères, par exemple, l’alphabetisme, et vous ne pouvez alors créer que 26 threads pour traiter z et contourner ce problème.) Quoi qu’il en soit, la question est probablement de savoir comment écrire en F #. code async permettant de diviser le travail entre les threads ", ainsi le code ci-dessous le démontre.

#light

let input = [| "aaaaa"; "bbb"; "ccccccc"; "abbbc" |]

// first discover all unique letters used
let Letters str = 
    str |> Seq.fold (fun set c -> Set.add c set) Set.empty 
let allLetters = 
    input |> Array.map (fun str -> 
        async { return Letters str })
    |> Async.Parallel 
    |> Async.Run     
    |> Set.union_all // note, this step is single-threaded, 
        // if input has many strings, can improve this

// Now count each letter on a separate thread
let CountLetter letter =
    let mutable count = 0
    for str in input do
        for c in str do
            if letter = c then
                count <- count + 1
    letter, count
let result = 
    allLetters |> Seq.map (fun c ->
        async { return CountLetter c })
    |> Async.Parallel 
    |> Async.Run

// print results
for letter,count in result do
    printfn "%c : %d" letter count

En effet, j’ai 'complètement modifié l’algorithme', principalement parce que l’algorithme original que vous aviez n’est pas particulièrement adapté à la parallélisation directe des données en raison du conflit de mise à jour. En fonction de ce que vous voulez apprendre, cette réponse peut être satisfaisante ou non.

Autres conseils

Vous pouvez écrire cela comme ceci:

let wordFrequency =
  Seq.concat >> Seq.filter System.Char.IsLetter >> Seq.countBy id >> Map.ofSeq

et ne la parallélisez avec que deux caractères supplémentaires pour utiliser le module PSeq à partir de la FSharp.PowerPack.Parallel.Seq DLL au lieu de la Seq ordinaire module:

let wordFrequency =
  Seq.concat >> PSeq.filter System.Char.IsLetter >> PSeq.countBy id >> Map.ofSeq

Par exemple, le temps nécessaire pour calculer les fréquences de la Bible de 5,5 Mo de King James passe de 4,75 à 0,66. C’est une accélération de 7,2 × sur cette machine à 8 cœurs.

Parallèle n’est pas synonyme d’async, comme explique Don Syme .

Donc, IMO, vous feriez mieux d'utiliser PLINQ pour paralléliser.

Je ne parle pas du tout F # bien, mais je peux répondre à cela. Pensez à l’utilisation de map / réduire

Soit n = carte (S) le nombre de symboles s dans l’alphabet.

Étape de la carte:

Spawn n processus, où l'attribution du i -ième processus consiste à comptabiliser le nombre d'occurrences du symbole s i dans tout le vecteur d’entrée.

Réduire l'étape :

Rassemblez le total de chacun des n processus dans l'ordre. Ce vecteur correspond à vos résultats.

Maintenant, cette version n'apporte aucune amélioration par rapport à une version série. Je soupçonne qu’il existe une dépendance cachée ici qui rend ce processus intrinsèquement difficile à mettre en parallèle, mais je suis trop fatigué et mon cerveau est mort pour le prouver ce soir.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top