كيفية تحقيق عدم التزامن بدلاً من التوازي في و#
-
29-09-2019 - |
سؤال
(التمسك بمثال مشترك مع الجلب غير المتزامن للعديد من صفحات الويب)
كيف يمكنني تدوير عدة (مئات) من صفحة الويب التي تطلبها بشكل غير متزامن ، ثم انتظر إكمال جميع الطلبات قبل الذهاب إلى الخطوة التالية؟ تقوم Async.assalled بمعالجة بعض الطلبات في وقت واحد ، يتم التحكم فيها بواسطة عدد النوى على وحدة المعالجة المركزية. الاستيلاء على صفحة ويب ليس عملية مرتبطة بوحدة المعالجة المركزية. غير راضٍ عن تسريع Async.asparalle ، أنا أبحث عن بدائل.
حاولت توصيل النقاط بين Async.Startastask و Task []. WaitAll. غريزيًا ، كتبت الكود التالي ، لكنه لا يجمع.
let processItemsConcurrently (items : int seq) =
let tasks = items |> Seq.map (fun item -> Async.StartAsTask(fetchAsync item))
Tasks.Task.WaitAll(tasks)
كيف يمكنك التعامل مع هذا؟
المحلول
Async.Parallel
بالتأكيد تقريبا هنا. لست متأكدًا مما أنت لست سعيدًا به ؛ تكمن قوة F# Asyncs في الحوسبة غير المتزامنة أكثر من الأشياء المرتبطة بمهمة وحدة المعالجة المركزية (والتي هي أكثر خصيصًا Task
S و .NET 4.0 TPL). هذا مثال كامل:
open System.Diagnostics
open System.IO
open System.Net
open Microsoft.FSharp.Control.WebExtensions
let sites = [|
"http://bing.com"
"http://google.com"
"http://cnn.com"
"http://stackoverflow.com"
"http://yahoo.com"
"http://msdn.com"
"http://microsoft.com"
"http://apple.com"
"http://nfl.com"
"http://amazon.com"
"http://ebay.com"
"http://expedia.com"
"http://twitter.com"
"http://reddit.com"
"http://hulu.com"
"http://youtube.com"
"http://wikipedia.org"
"http://live.com"
"http://msn.com"
"http://wordpress.com"
|]
let print s =
// careful, don't create a synchronization bottleneck by printing
//printf "%s" s
()
let printSummary info fullTimeMs =
Array.sortInPlaceBy (fun (i,_,_) -> i) info
// for i, size, time in info do
// printfn "%2d %7d %5d" i size time
let longest = info |> Array.map (fun (_,_,time) -> time) |> Array.max
printfn "longest request took %dms" longest
let bytes = info |> Array.sumBy (fun (_,size,_) -> float size)
let seconds = float fullTimeMs / 1000.
printfn "sucked down %7.2f KB/s" (bytes / 1024.0 / seconds)
let FetchAllSync() =
let allsw = Stopwatch.StartNew()
let info = sites |> Array.mapi (fun i url ->
let sw = Stopwatch.StartNew()
print "S"
let req = WebRequest.Create(url)
use resp = req.GetResponse()
use stream = resp.GetResponseStream()
use reader = new StreamReader(stream,
System.Text.Encoding.UTF8, true, 4096)
print "-"
let contents = reader.ReadToEnd()
print "r"
i, contents.Length, sw.ElapsedMilliseconds)
let time = allsw.ElapsedMilliseconds
printSummary info time
time, info |> Array.sumBy (fun (_,size,_) -> size)
let FetchAllAsync() =
let allsw = Stopwatch.StartNew()
let info = sites |> Array.mapi (fun i url -> async {
let sw = Stopwatch.StartNew()
print "S"
let req = WebRequest.Create(url)
use! resp = req.AsyncGetResponse()
use stream = resp.GetResponseStream()
use reader = new AsyncStreamReader(stream, // F# PowerPack
System.Text.Encoding.UTF8, true, 4096)
print "-"
let! contents = reader.ReadToEnd() // in F# PowerPack
print "r"
return i, contents.Length, sw.ElapsedMilliseconds })
|> Async.Parallel
|> Async.RunSynchronously
let time = allsw.ElapsedMilliseconds
printSummary info time
time, info |> Array.sumBy (fun (_,size,_) -> size)
// By default, I think .NET limits you to 2 open connections at once
ServicePointManager.DefaultConnectionLimit <- sites.Length
for i in 1..3 do // to warmup and show variance
let time1,r1 = FetchAllSync()
printfn "Sync took %dms, result was %d" time1 r1
let time2,r2 = FetchAllAsync()
printfn "Async took %dms, result was %d (speedup=%2.2f)"
time2 r2 (float time1/ float time2)
printfn ""
على صندوق 4-core الخاص بي ، هذا يعطي باستمرار تسريع 4x تقريبًا.
تعديل
ردا على تعليقك ، لقد قمت بتحديث الرمز. أنت على حق في أنني أضفت المزيد من المواقع ولا أرى التسريع المتوقع (لا يزال محتجزًا حوالي 4x). لقد بدأت في إضافة القليل من الإخراج تصحيح الأخطاء أعلاه ، سأستمر في التحقيق لمعرفة ما إذا كان هناك شيء آخر يخدع الاتصالات ...
تعديل
مجندة الرمز مرة أخرى. حسنًا ، لقد وجدت ما قد يكون عنق الزجاجة. إليك تنفيذ Asyncreadtoend في PowerPack:
type System.IO.StreamReader with
member s.AsyncReadToEnd () =
FileExtensions.UnblockViaNewThread (fun () -> s.ReadToEnd())
وبعبارة أخرى ، فإنه يمنع خيط ThreadPool ويقرأ بشكل متزامن. أرغ !!! اسمحوا لي أن أرى ما إذا كان بإمكاني العمل حول ذلك.
تعديل
حسنًا ، يقوم Asyncstreamreader في PowerPack بعمل الشيء الصحيح ، وأنا أستخدم ذلك الآن.
ومع ذلك ، يبدو أن القضية الرئيسية التباين.
عندما تضغط ، على سبيل المثال ، cnn.com ، ستعود النتيجة في كثير من الأحيان إلى 500 مللي ثانية. ولكن بين الحين والآخر تحصل على هذا الطلب الذي يستغرق 4s ، وهذا بالطبع يقتل Async Perf الظاهر ، لأن الوقت الكلي هو وقت الطلب السيئ.
عند تشغيل البرنامج أعلاه ، أرى سرعات من حوالي 2.5x إلى 9x على صندوقي المكون من 2 نواة في المنزل. إنه متغير للغاية ، رغم ذلك. لا يزال من الممكن أن يكون هناك بعض عنق الزجاجة في البرنامج الذي فاتني ، لكنني أعتقد أن تباين Web قد يفسر كل ما أراه في هذه المرحلة.
نصائح أخرى
باستخدام الامتدادات التفاعلية لـ .NET مع F#، يمكنك كتابة حل أنيق للغاية - تحقق من العينة في http://blog.paulbetts.org/index.php/2010/11/16/making-async-io-work-for-you-reactive-style/ (يستخدم هذا C# ، ولكن استخدام F# أمر سهل للغاية ؛ والمفتاح يستخدم طرق البداية/النهاية بدلاً من طريقة المزامنة ، حتى لو كان بإمكانك تجميعها ، فسيحظرها n
Threadpool Threads بشكل غير ضروري ، بدلاً من Threadpool مجرد التقاط إجراءات الانتهاء عند وصولها)
رهاني هو أن التسريع الذي تواجهه ليس مهمًا بما يكفي لذوقك لأنك إما تستخدم نوعًا فرعيًا من WebRequest أو فئة تعتمد عليه (مثل WebClient).
إذا كان هذا هو الحال ، فأنت بحاجة إلى تعيين ملف MaxConnection على اتصال ConnectionMancePlement (وأقترح عليك تعيينه فقط إذا لزم الأمر وإلا فإنها ستصبح عملية تستغرق وقتًا طويلاً) لقيمة عالية ، اعتمادًا على عدد الاتصالات المتزامنة التي تريد أن تبدأها من تطبيقك.
أنا لست رجلاً ، ولكن من منظور .NET خالص ، فإن ما تبحث عنه هو TaskFactory :: FromAsync حيث تكون المكالمة غير المتزامنة التي ستحتفظ بها في مهمة مثل httprequest :: pegingetResponse. يمكنك أيضًا اختتام نموذج EAP الذي يعرضه WebClient باستخدام TaskCompletionsource. المزيد عن كل من هؤلاء موضوعات هنا على MSDN.
نأمل مع هذه المعرفة ، يمكنك العثور على أقرب نهج F# الأصلي لإنجاز ما تحاول القيام به.
إليك بعض التعليمات البرمجية التي تتجنب المجهول ، مثل زمن الوصول إلى الويب. أحصل على استخدام أقل من 5 ٪ من وحدة المعالجة المركزية ، وحوالي 60-80 ٪ من الكفاءة لكل من مسارات رمز المزامنة والمتزامنة.
open System.Diagnostics
let numWorkers = 200
let asyncDelay = 50
let main =
let codeBlocks = [for i in 1..numWorkers ->
async { do! Async.Sleep asyncDelay } ]
while true do
printfn "Concurrent started..."
let sw = new Stopwatch()
sw.Start()
codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore
sw.Stop()
printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds
printfn "efficiency: %d%%" (int64 (asyncDelay * 100) / sw.ElapsedMilliseconds)
printfn "Synchronous started..."
let sw = new Stopwatch()
sw.Start()
for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore
sw.Stop()
printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds
printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100) / sw.ElapsedMilliseconds)
main