Вопрос

(Придерживаясь общего примера с асинхронным выбором многих веб -страниц)

Как бы я сразился с несколькими (сотнями) запросами веб -страницы асинхронно, а затем подожду, пока все запросы будут выполнены, прежде чем перейти на следующий шаг? Async.Asparallel обрабатывает несколько запросов за раз, контролируемые количеством ядер на процессоре. Захват веб-страницы не является операцией, связанной с процессором. Не удовлетворен ускорением Async.Asparallel, я ищу альтернативы.

Я пытался подключить точки между Async.startassack и задачами []. Оператор. Инстинктивно я написал следующий код, но он не скомпилируется.

let processItemsConcurrently (items : int seq) = 
  let tasks = items |> Seq.map (fun item -> Async.StartAsTask(fetchAsync item))
  Tasks.Task.WaitAll(tasks) 

Как бы вы подошли к этому?

Это было полезно?

Решение

Async.Parallel почти определенно прямо здесь. Не уверен, что вы не довольны; Сила F # Asyncs лежит более в асинхронных вычислениях, чем в задаче, параллельно CPU-связанные материалы (что более адаптировано к TaskS и .NET 4.0 TPL). Вот полный пример:

open System.Diagnostics 
open System.IO
open System.Net
open Microsoft.FSharp.Control.WebExtensions 

let sites = [|
    "http://bing.com"
    "http://google.com"
    "http://cnn.com"
    "http://stackoverflow.com"
    "http://yahoo.com"
    "http://msdn.com"
    "http://microsoft.com"
    "http://apple.com"
    "http://nfl.com"
    "http://amazon.com"
    "http://ebay.com"
    "http://expedia.com"
    "http://twitter.com"
    "http://reddit.com"
    "http://hulu.com"
    "http://youtube.com"
    "http://wikipedia.org"
    "http://live.com"
    "http://msn.com"
    "http://wordpress.com"
    |]

let print s = 
    // careful, don't create a synchronization bottleneck by printing
    //printf "%s" s
    ()

let printSummary info fullTimeMs =
    Array.sortInPlaceBy (fun (i,_,_) -> i) info
//  for i, size, time in info do
//      printfn "%2d  %7d  %5d" i size time
    let longest = info |> Array.map (fun (_,_,time) -> time) |> Array.max
    printfn "longest request took %dms" longest
    let bytes = info |> Array.sumBy (fun (_,size,_) -> float size)
    let seconds = float fullTimeMs / 1000.
    printfn "sucked down %7.2f KB/s" (bytes / 1024.0 / seconds)

let FetchAllSync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url ->
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use resp = req.GetResponse()
        use stream = resp.GetResponseStream()
        use reader = new StreamReader(stream,
                            System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let contents = reader.ReadToEnd()
        print "r"
        i, contents.Length, sw.ElapsedMilliseconds)
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

let FetchAllAsync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url -> async {
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use! resp = req.AsyncGetResponse()
        use stream = resp.GetResponseStream()
        use reader = new AsyncStreamReader(stream, // F# PowerPack
                           System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let! contents = reader.ReadToEnd()  // in F# PowerPack
        print "r"
        return i, contents.Length, sw.ElapsedMilliseconds })
                    |> Async.Parallel 
                    |> Async.RunSynchronously 
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

// By default, I think .NET limits you to 2 open connections at once
ServicePointManager.DefaultConnectionLimit <- sites.Length 

for i in 1..3 do // to warmup and show variance
    let time1,r1 = FetchAllSync()
    printfn "Sync took %dms, result was %d" time1 r1
    let time2,r2 = FetchAllAsync()
    printfn "Async took %dms, result was %d  (speedup=%2.2f)" 
        time2 r2 (float time1/ float time2)
    printfn ""

На моей 4-я ящике это последовательно дает почти 4-кратное ускорение.

РЕДАКТИРОВАТЬ

В ответ на ваш комментарий я обновил код. Вы правы в том, что я добавил больше сайтов, и не вижу ожидаемого ускорения (все еще удерживая устойчивое примерно на 4х). Я начал добавлять небольшую отладочную выходу выше, продолжит расследование, чтобы увидеть, что что-то еще включает в себя соединения ...

РЕДАКТИРОВАТЬ

Снова изменил код. Ну, я нашел то, что может быть узким местом. Вот реализация AsyncreadToend в PowerPack:

type System.IO.StreamReader with
   member s.AsyncReadToEnd () = 
       FileExtensions.UnblockViaNewThread (fun () -> s.ReadToEnd())

Другими словами, он просто блокирует нить потока и считывает синхронно. Арг !!! Позвольте мне посмотреть, смогу ли я обойти это.

РЕДАКТИРОВАТЬ

Хорошо, асинкстрим читатель в PowerPack делает правильные вещи, и я использую это сейчас.

Однако ключевой вопрос, по -видимому, заключается в дисперсия.

Когда вы нажимаете, скажем, CNN.com, большую часть времени результат возвращается примерно на 500 мс. Но время от времени вы получаете один запрос, который занимает 4 с, и это, конечно, потенциально убивает кажущуюся асинхровую перфу, поскольку общее время - это время самой неудачной запроса.

Запустив программу выше, я вижу ускорение примерно от 2,5 до 9x на моей двухъядерной коробке дома. Это очень сильно переменное, хотя. По-прежнему возможно, в программе есть какое-то узкое место, которое я пропустил, но я думаю, что дисперсия Web может объяснить все, что я вижу на данный момент.

Другие советы

Использование реактивных расширений для .NET в сочетании с F #, вы можете написать очень элегантное решение - проверьте образец в http://blog.paulbetts.org/index.php/2010/11/16/making-async-io-work-for-you-reactive-style/ (Это использует C #, но использование f # тоже легко; ключ использует методы начала / конечного вместо метода синхронизации, который даже если вы можете компилировать, он будет заблокировать n Недооценки потока потока, вместо того, чтобы сбоевать, просто поднимая процедуры завершения по мере их появления)

Моя ставка состоит в том, что ускорение, которое вы испытываете недостаточно значимы для вашего вкуса, потому что вы либо используете подтип WebRequest или класса, полагаясь на него (например, WebClient).
Если это так, вам нужно установить Максиннекция На ConnectionManagementElement (и я предлагаю вам установить его только в случае необходимости, иначе это станет довольно трудоемкой операцией) на высокое значение, в зависимости от количества одновременных соединений, которые вы хотите инициировать из своего приложения.

Я не парень, но с чистой точки зрения .net то, что вы ищете, это TaskFactory :: From Async, где асинхронный призыв вы бы охватили в задаче, было бы что -то вроде httprequest :: beggeTresponse. Вы также можете завершить модель EAP, которую WebClient выявляет с использованием задачи. Подробнее об обоих этих Темы здесь на MSDN.

Надеюсь, что эти знания вы можете найти ближайший родной подход F # для достижения того, что вы пытаетесь сделать.

Вот какой -то код, который избегает неизвестных, таких как задержка доступа в Интернет. Я получаю менее 5% использования процессора и около 60-80% эффективности как для синхронизации, так и для асинхронных кодов.

open System.Diagnostics

let numWorkers = 200
let asyncDelay = 50

let main =
   let codeBlocks = [for i in 1..numWorkers -> 
                        async { do! Async.Sleep asyncDelay } ]

   while true do
      printfn "Concurrent started..."
      let sw = new Stopwatch()
      sw.Start()
      codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * 100) / sw.ElapsedMilliseconds)

      printfn "Synchronous started..."
      let sw = new Stopwatch()
      sw.Start()
      for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100) / sw.ElapsedMilliseconds)

main
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top