パラレルタスクライブラリワイケーションデザイン
-
25-09-2019 - |
質問
PTLを探索し始めたばかりで、デザインの質問があります。
私のシナリオ:それぞれが画像を参照するURLのリストがあります。各画像を並行してダウンロードしたいです。出来るだけ早く 少なくとも一つの 画像がダウンロードされます。ダウンロードされた画像で何かを実行するメソッドを実行したいと思います。その方法は並列化されるべきではありません - それはシリアルでなければなりません。
以下はうまくいくと思いますが、これが正しい方法であるかどうかはわかりません。画像を収集し、収集された画像で「何か」を実行するための個別のクラスがあるため、画像の取得方法の内側の仕組みを公開するため、間違っていると思われるタスクの配列を渡すことになります。しかし、私はそれを回避する方法を知りません。実際には、これらの両方の方法にはさらに多くのものがありますが、これは重要ではありません。それらは、イメージで何かを取得して行うことの両方を1つの大きな方法にまとめてはならないことを知ってください。
//From the Director class
Task<Image>[] downloadTasks = collector.RetrieveImages(listOfURLs);
for (int i = 0; i < listOfURLs.Count; i++)
{
//Wait for any of the remaining downloads to complete
int completedIndex = Task<Image>.WaitAny(downloadTasks);
Image completedImage = downloadTasks[completedIndex].Result;
//Now do something with the image (this "something" must happen serially)
//Uses the "Formatter" class to accomplish this let's say
}
///////////////////////////////////////////////////
//From the Collector class
public Task<Image>[] RetrieveImages(List<string> urls)
{
Task<Image>[] tasks = new Task<Image>[urls.Count];
int index = 0;
foreach (string url in urls)
{
string lambdaVar = url; //Required... Bleh
tasks[index] = Task<Image>.Factory.StartNew(() =>
{
using (WebClient client = new WebClient())
{
//TODO: Replace with live image locations
string fileName = String.Format("{0}.png", i);
client.DownloadFile(lambdaVar, Path.Combine(Application.StartupPath, fileName));
}
return Image.FromFile(Path.Combine(Application.StartupPath, fileName));
},
TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent);
index++;
}
return tasks;
}
解決
通常、Waitanyを使用して、他の結果の結果を気にしない場合に1つのタスクを待ちます。たとえば、たまたま返された最初の画像を気にかけた場合。
代わりにこれはどうですか。
これにより、2つのタスクが作成されます。1つは画像をロードし、ブロッキングコレクションに追加します。 2番目のタスクはコレクションを待機し、キューに追加された画像を処理します。すべての画像がロードされると、最初のタスクがキューを閉じて、2番目のタスクがシャットダウンできるようにします。
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Drawing;
using System.IO;
using System.Net;
using System.Threading.Tasks;
namespace ClassLibrary1
{
public class Class1
{
readonly string _path = Directory.GetCurrentDirectory();
public void Demo()
{
IList<string> listOfUrls = new List<string>();
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");
BlockingCollection<Image> images = new BlockingCollection<Image>();
Parallel.Invoke(
() => // Task 1: load the images
{
Parallel.For(0, listOfUrls.Count, (i) =>
{
Image img = RetrieveImages(listOfUrls[i], i);
img.Tag = i;
images.Add(img); // Add each image to the queue
});
images.CompleteAdding(); // Done with images.
},
() => // Task 2: Process images serially
{
foreach (var img in images.GetConsumingEnumerable())
{
string newPath = Path.Combine(_path, String.Format("{0}_rot.png", img.Tag));
Console.WriteLine("Rotating image {0}", img.Tag);
img.RotateFlip(RotateFlipType.RotateNoneFlipXY);
img.Save(newPath);
}
});
}
public Image RetrieveImages(string url, int i)
{
using (WebClient client = new WebClient())
{
string fileName = Path.Combine(_path, String.Format("{0}.png", i));
Console.WriteLine("Downloading {0}...", url);
client.DownloadFile(url, Path.Combine(_path, fileName));
Console.WriteLine("Saving {0} as {1}.", url, fileName);
return Image.FromFile(Path.Combine(_path, fileName));
}
}
}
}
警告:コードにはエラーチェックやキャンセルがありません。遅れて、正しいことをするために何かが必要ですか? :)
これはパイプラインパターンの例です。画像の取得はかなり遅く、ブロッキングコレクション内のロックのコストは、画像のダウンロードに費やした時間と比較して比較的頻繁に発生するため、問題を引き起こすことはないと想定しています。
私たちの本...あなたはこのパターンの詳細を読むことができます並列プログラミングのために http://parallelpatterns.codeplex.com/第7章では、パイプラインについて説明し、それに伴う例はエラー処理とキャンセルを伴うパイプラインを示しています。
他のヒント
TPLは、別のタスクが終了したときに1つのタスクを実行するために継続的な関数を既に提供しています。タスクチェーンは、非同期操作にTPLで使用される主なパターンの1つです。
次の方法では、画像のセットをダウンロードし、各ファイルの名前を変更して続行します
static void DownloadInParallel(string[] urls)
{
var tempFolder = Path.GetTempPath();
var downloads = from url in urls
select Task.Factory.StartNew<string>(() =>{
using (var client = new WebClient())
{
var uri = new Uri(url);
string file = Path.Combine(tempFolder,uri.Segments.Last());
client.DownloadFile(uri, file);
return file;
}
},TaskCreationOptions.LongRunning|TaskCreationOptions.AttachedToParent)
.ContinueWith(t=>{
var filePath = t.Result;
File.Move(filePath, filePath + ".test");
},TaskContinuationOptions.ExecuteSynchronously);
var results = downloads.ToArray();
Task.WaitAll(results);
}
また、確認する必要があります WebClient Asyncタスク ParallelExtensionSextrasサンプルから。ダウンロードxxxxtask拡張メソッドは、タスクの作成とファイルの非同期ダウンロードの両方を処理します。
次の方法では、DownloadDatatask拡張機能を使用して画像のデータを取得し、ディスクに保存する前に回転させます
static void DownloadInParallel2(string[] urls)
{
var tempFolder = Path.GetTempPath();
var downloads = from url in urls
let uri=new Uri(url)
let filePath=Path.Combine(tempFolder,uri.Segments.Last())
select new WebClient().DownloadDataTask(uri)
.ContinueWith(t=>{
var img = Image.FromStream(new MemoryStream(t.Result));
img.RotateFlip(RotateFlipType.RotateNoneFlipY);
img.Save(filePath);
},TaskContinuationOptions.ExecuteSynchronously);
var results = downloads.ToArray();
Task.WaitAll(results);
}
The best way to do this would probably be by implementing the Observer pattern: have your RetreiveImages
function implement IObservable, put your "completed image action" into an IObserver object's OnNext
method, and subscribe it to RetreiveImages
.
I haven't tried this myself yet (still have to play more with the task library) but I think this is the "right" way to do it.
//すべての画像をダウンロードします
private async void GetAllImages ()
{
var downloadTasks = listOfURLs.Where(url => !string.IsNullOrEmpty(url)).Select(async url =>
{
var ret = await RetrieveImage(url);
return ret;
}).ToArray();
var counts = await Task.WhenAll(downloadTasks);
}
//From the Collector class
public async Task<Image> RetrieveImage(string url)
{
var lambdaVar = url; //Required... Bleh
using (WebClient client = new WebClient())
{
//TODO: Replace with live image locations
var fileName = String.Format("{0}.png", i);
await client.DownloadFile(lambdaVar, Path.Combine(Application.StartupPath, fileName));
}
return Image.FromFile(Path.Combine(Application.StartupPath, fileName));
}