سؤال

(التمسك بمثال مشترك مع الجلب غير المتزامن للعديد من صفحات الويب)

كيف يمكنني تدوير عدة (مئات) من صفحة الويب التي تطلبها بشكل غير متزامن ، ثم انتظر إكمال جميع الطلبات قبل الذهاب إلى الخطوة التالية؟ تقوم Async.assalled بمعالجة بعض الطلبات في وقت واحد ، يتم التحكم فيها بواسطة عدد النوى على وحدة المعالجة المركزية. الاستيلاء على صفحة ويب ليس عملية مرتبطة بوحدة المعالجة المركزية. غير راضٍ عن تسريع Async.asparalle ، أنا أبحث عن بدائل.

حاولت توصيل النقاط بين Async.Startastask و Task []. WaitAll. غريزيًا ، كتبت الكود التالي ، لكنه لا يجمع.

let processItemsConcurrently (items : int seq) = 
  let tasks = items |> Seq.map (fun item -> Async.StartAsTask(fetchAsync item))
  Tasks.Task.WaitAll(tasks) 

كيف يمكنك التعامل مع هذا؟

هل كانت مفيدة؟

المحلول

Async.Parallel بالتأكيد تقريبا هنا. لست متأكدًا مما أنت لست سعيدًا به ؛ تكمن قوة F# Asyncs في الحوسبة غير المتزامنة أكثر من الأشياء المرتبطة بمهمة وحدة المعالجة المركزية (والتي هي أكثر خصيصًا TaskS و .NET 4.0 TPL). هذا مثال كامل:

open System.Diagnostics 
open System.IO
open System.Net
open Microsoft.FSharp.Control.WebExtensions 

let sites = [|
    "http://bing.com"
    "http://google.com"
    "http://cnn.com"
    "http://stackoverflow.com"
    "http://yahoo.com"
    "http://msdn.com"
    "http://microsoft.com"
    "http://apple.com"
    "http://nfl.com"
    "http://amazon.com"
    "http://ebay.com"
    "http://expedia.com"
    "http://twitter.com"
    "http://reddit.com"
    "http://hulu.com"
    "http://youtube.com"
    "http://wikipedia.org"
    "http://live.com"
    "http://msn.com"
    "http://wordpress.com"
    |]

let print s = 
    // careful, don't create a synchronization bottleneck by printing
    //printf "%s" s
    ()

let printSummary info fullTimeMs =
    Array.sortInPlaceBy (fun (i,_,_) -> i) info
//  for i, size, time in info do
//      printfn "%2d  %7d  %5d" i size time
    let longest = info |> Array.map (fun (_,_,time) -> time) |> Array.max
    printfn "longest request took %dms" longest
    let bytes = info |> Array.sumBy (fun (_,size,_) -> float size)
    let seconds = float fullTimeMs / 1000.
    printfn "sucked down %7.2f KB/s" (bytes / 1024.0 / seconds)

let FetchAllSync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url ->
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use resp = req.GetResponse()
        use stream = resp.GetResponseStream()
        use reader = new StreamReader(stream,
                            System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let contents = reader.ReadToEnd()
        print "r"
        i, contents.Length, sw.ElapsedMilliseconds)
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

let FetchAllAsync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url -> async {
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use! resp = req.AsyncGetResponse()
        use stream = resp.GetResponseStream()
        use reader = new AsyncStreamReader(stream, // F# PowerPack
                           System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let! contents = reader.ReadToEnd()  // in F# PowerPack
        print "r"
        return i, contents.Length, sw.ElapsedMilliseconds })
                    |> Async.Parallel 
                    |> Async.RunSynchronously 
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

// By default, I think .NET limits you to 2 open connections at once
ServicePointManager.DefaultConnectionLimit <- sites.Length 

for i in 1..3 do // to warmup and show variance
    let time1,r1 = FetchAllSync()
    printfn "Sync took %dms, result was %d" time1 r1
    let time2,r2 = FetchAllAsync()
    printfn "Async took %dms, result was %d  (speedup=%2.2f)" 
        time2 r2 (float time1/ float time2)
    printfn ""

على صندوق 4-core الخاص بي ، هذا يعطي باستمرار تسريع 4x تقريبًا.

تعديل

ردا على تعليقك ، لقد قمت بتحديث الرمز. أنت على حق في أنني أضفت المزيد من المواقع ولا أرى التسريع المتوقع (لا يزال محتجزًا حوالي 4x). لقد بدأت في إضافة القليل من الإخراج تصحيح الأخطاء أعلاه ، سأستمر في التحقيق لمعرفة ما إذا كان هناك شيء آخر يخدع الاتصالات ...

تعديل

مجندة الرمز مرة أخرى. حسنًا ، لقد وجدت ما قد يكون عنق الزجاجة. إليك تنفيذ Asyncreadtoend في PowerPack:

type System.IO.StreamReader with
   member s.AsyncReadToEnd () = 
       FileExtensions.UnblockViaNewThread (fun () -> s.ReadToEnd())

وبعبارة أخرى ، فإنه يمنع خيط ThreadPool ويقرأ بشكل متزامن. أرغ !!! اسمحوا لي أن أرى ما إذا كان بإمكاني العمل حول ذلك.

تعديل

حسنًا ، يقوم Asyncstreamreader في PowerPack بعمل الشيء الصحيح ، وأنا أستخدم ذلك الآن.

ومع ذلك ، يبدو أن القضية الرئيسية التباين.

عندما تضغط ، على سبيل المثال ، cnn.com ، ستعود النتيجة في كثير من الأحيان إلى 500 مللي ثانية. ولكن بين الحين والآخر تحصل على هذا الطلب الذي يستغرق 4s ، وهذا بالطبع يقتل Async Perf الظاهر ، لأن الوقت الكلي هو وقت الطلب السيئ.

عند تشغيل البرنامج أعلاه ، أرى سرعات من حوالي 2.5x إلى 9x على صندوقي المكون من 2 نواة في المنزل. إنه متغير للغاية ، رغم ذلك. لا يزال من الممكن أن يكون هناك بعض عنق الزجاجة في البرنامج الذي فاتني ، لكنني أعتقد أن تباين Web قد يفسر كل ما أراه في هذه المرحلة.

نصائح أخرى

باستخدام الامتدادات التفاعلية لـ .NET مع F#، يمكنك كتابة حل أنيق للغاية - تحقق من العينة في http://blog.paulbetts.org/index.php/2010/11/16/making-async-io-work-for-you-reactive-style/ (يستخدم هذا C# ، ولكن استخدام F# أمر سهل للغاية ؛ والمفتاح يستخدم طرق البداية/النهاية بدلاً من طريقة المزامنة ، حتى لو كان بإمكانك تجميعها ، فسيحظرها n Threadpool Threads بشكل غير ضروري ، بدلاً من Threadpool مجرد التقاط إجراءات الانتهاء عند وصولها)

رهاني هو أن التسريع الذي تواجهه ليس مهمًا بما يكفي لذوقك لأنك إما تستخدم نوعًا فرعيًا من WebRequest أو فئة تعتمد عليه (مثل WebClient).
إذا كان هذا هو الحال ، فأنت بحاجة إلى تعيين ملف MaxConnection على اتصال ConnectionMancePlement (وأقترح عليك تعيينه فقط إذا لزم الأمر وإلا فإنها ستصبح عملية تستغرق وقتًا طويلاً) لقيمة عالية ، اعتمادًا على عدد الاتصالات المتزامنة التي تريد أن تبدأها من تطبيقك.

أنا لست رجلاً ، ولكن من منظور .NET خالص ، فإن ما تبحث عنه هو TaskFactory :: FromAsync حيث تكون المكالمة غير المتزامنة التي ستحتفظ بها في مهمة مثل httprequest :: pegingetResponse. يمكنك أيضًا اختتام نموذج EAP الذي يعرضه WebClient باستخدام TaskCompletionsource. المزيد عن كل من هؤلاء موضوعات هنا على MSDN.

نأمل مع هذه المعرفة ، يمكنك العثور على أقرب نهج F# الأصلي لإنجاز ما تحاول القيام به.

إليك بعض التعليمات البرمجية التي تتجنب المجهول ، مثل زمن الوصول إلى الويب. أحصل على استخدام أقل من 5 ٪ من وحدة المعالجة المركزية ، وحوالي 60-80 ٪ من الكفاءة لكل من مسارات رمز المزامنة والمتزامنة.

open System.Diagnostics

let numWorkers = 200
let asyncDelay = 50

let main =
   let codeBlocks = [for i in 1..numWorkers -> 
                        async { do! Async.Sleep asyncDelay } ]

   while true do
      printfn "Concurrent started..."
      let sw = new Stopwatch()
      sw.Start()
      codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * 100) / sw.ElapsedMilliseconds)

      printfn "Synchronous started..."
      let sw = new Stopwatch()
      sw.Start()
      for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100) / sw.ElapsedMilliseconds)

main
مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top