Вопрос

Вы всегда слышите, что функциональный код по своей природе проще распараллелить, чем нефункциональный код, поэтому я решил написать функцию, которая выполняет следующие действия:

С учетом ввода строк, суммируйте количество уникальных символов для каждой строки. Итак, с учетом ввода [" aaaaa " ;; & Quot; БББ & Quot ;; & Quot; CCCCCCC & Quot ;; & Quot; Абббв & Quot; ] , наш метод вернет a: 6; б: 6; с: 8 .

Вот что я написал:

(* seq<#seq<char>> -> Map<char,int> *)
let wordFrequency input =
    input
    |> Seq.fold (fun acc text ->
        (* This inner loop can be processed on its own thread *)
        text
        |> Seq.choose (fun char -> if Char.IsLetter char then Some(char) else None)
        |> Seq.fold (fun (acc : Map<_,_>) item ->
            match acc.TryFind(item) with
            | Some(count) -> acc.Add(item, count + 1)
            | None -> acc.Add(item, 1))
            acc
        ) Map.empty

Этот код идеально распараллеливаем, потому что каждая строка в input может обрабатываться в своем собственном потоке. Это не так просто, как кажется, поскольку innerloop добавляет элементы на карту, совместно используемые всеми входами.

Я бы хотел, чтобы внутренний цикл был выделен в собственный поток, и я не хочу использовать какое-либо изменяемое состояние. Как бы я переписал эту функцию, используя рабочий процесс Async?

Это было полезно?

Решение

Как уже указывалось, возникает конфликт обновления, если вы пытаетесь заставить разные потоки обрабатывать разные входные строки, поскольку каждый поток может увеличивать счетчик каждой буквы. Вы можете сделать так, чтобы каждый поток создавал свою собственную карту, а затем «складывал все карты», но этот последний шаг может быть дорогостоящим (и не очень подходящим для использования потоков из-за общих данных). Я думаю, что большие входные данные, скорее всего, будут работать быстрее, используя алгоритм, подобный приведенному ниже, где каждый поток обрабатывает различную букву для подсчета (для всех строк во входных данных). В результате каждый поток имеет свой собственный независимый счетчик, поэтому нет конфликтов обновления и нет окончательного шага для объединения результатов. Однако нам требуется предварительная обработка, чтобы обнаружить «набор уникальных букв», и на этом этапе возникает та же проблема конфликта. (На практике вы, вероятно, знаете заранее весь набор символов, например, алфавиты, а затем можете просто создать 26 потоков для обработки az и обойти эту проблему.) В любом случае, по-видимому, в основном вопрос состоит в том, чтобы изучить, «как писать F #». асинхронный код для разделения работы между потоками », поэтому приведенный ниже код демонстрирует это.

#light

let input = [| "aaaaa"; "bbb"; "ccccccc"; "abbbc" |]

// first discover all unique letters used
let Letters str = 
    str |> Seq.fold (fun set c -> Set.add c set) Set.empty 
let allLetters = 
    input |> Array.map (fun str -> 
        async { return Letters str })
    |> Async.Parallel 
    |> Async.Run     
    |> Set.union_all // note, this step is single-threaded, 
        // if input has many strings, can improve this

// Now count each letter on a separate thread
let CountLetter letter =
    let mutable count = 0
    for str in input do
        for c in str do
            if letter = c then
                count <- count + 1
    letter, count
let result = 
    allLetters |> Seq.map (fun c ->
        async { return CountLetter c })
    |> Async.Parallel 
    |> Async.Run

// print results
for letter,count in result do
    printfn "%c : %d" letter count

Я действительно «полностью изменил алгоритм», главным образом потому, что исходный алгоритм, который вы использовали, не особенно подходит для прямого распараллеливания данных из-за конфликта обновления. В зависимости от того, что именно вы собираетесь изучать, этот ответ может быть или не быть особенно удовлетворительным для вас.

Другие советы

Вы можете написать это так:

let wordFrequency =
  Seq.concat >> Seq.filter System.Char.IsLetter >> Seq.countBy id >> Map.ofSeq

и распараллелить его только с двумя дополнительными символами, чтобы использовать модуль PSeq из библиотеки FSharp.PowerPack.Parallel.Seq вместо обычной Seq модуль:

let wordFrequency =
  Seq.concat >> PSeq.filter System.Char.IsLetter >> PSeq.countBy id >> Map.ofSeq

Например, время, затрачиваемое на вычисление частот из Библии короля Джеймса 5,5 Мб, падает с 4,75 до 0,66 с. Это 7,2 & # 215; ускорение на этом 8-ядерном компьютере.

Параллель - это не то же самое, что асинхронный, как объясняет Дон Сайм .

Так что, IMO, вам лучше использовать PLINQ для распараллеливания.

Я плохо говорю на F #, но я могу решить это. Подумайте об использовании карты / уменьшить:

пусть n = карточка (& # 931;) будет количеством символов & # 963; в алфавите & # 931 ;.

Этап карты:

Процессы

Spawn n , где назначением процесса i является подсчет количества вхождений символа & # 963; i во всем входном векторе.

Сократить этап .

Соберите сумму по каждому из процессов n по порядку. Этот вектор - ваши результаты.

Теперь эта версия не приводит к каким-либо улучшениям по сравнению с последовательной версией; Я подозреваю, что здесь есть скрытая зависимость, которая затрудняет параллелизацию по своей сути, но я слишком устала и умираю, чтобы доказать это сегодня вечером.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top