CCRを使用したイベントを注文するための効率的な方法は何ですか?
質問
私は、各フィードからのデータを順番に処理する必要がある大量のデータフィードの並列処理を必要とするタスクの解決策としてCCR Iteratorを実験していました。フィードはいずれも互いに依存していないため、供給ごとに並列化することができます。
以下は、1つの整数フィードを備えた迅速で汚れたモックアップです。これは、整数を約1.5k/秒の速度でポートに押し込むだけで、CCRイテレーターを使用してそれらを引き出して、注文内処理保証を保持します。
class Program
{
static Dispatcher dispatcher = new Dispatcher();
static DispatcherQueue dispatcherQueue =
new DispatcherQueue("DefaultDispatcherQueue", dispatcher);
static Port<int> intPort = new Port<int>();
static void Main(string[] args)
{
Arbiter.Activate(
dispatcherQueue,
Arbiter.FromIteratorHandler(new IteratorHandler(ProcessInts)));
int counter = 0;
Timer t = new Timer( (x) =>
{ for(int i = 0; i < 1500; ++i) intPort.Post(counter++);}
, null, 0, 1000);
Console.ReadKey();
}
public static IEnumerator<ITask> ProcessInts()
{
while (true)
{
yield return intPort.Receive();
int currentValue;
if( (currentValue = intPort) % 1000 == 0)
{
Console.WriteLine("{0}, Current Items In Queue:{1}",
currentValue, intPort.ItemCount);
}
}
}
}
これについて大いに驚いたのは、CCRがCorei7ボックスに追いつくことができず、キューのサイズが境界なしで成長することでした。ポスト()から荷重()の荷重()へのレイテンシを測定する別のテストまたは〜100ポスト/秒で、各バッチの最初のpost()とreceive()の間の遅延は約1msでした。
私のモックアップに何か問題がありますか?もしそうなら、CCRを使用してこれを行うより良い方法は何ですか?
解決
はい、私は同意します、これは確かに奇妙に見えます。コードは最初はスムーズに実行されているように見えますが、数千のアイテムの後、プロセッサの使用はパフォーマンスが本当に不十分な時点まで上昇します。これは私を邪魔し、フレームワークの問題を示唆しています。あなたのコードで遊んだ後、私はこれがなぜそうなのかを本当に特定することができません。この問題を引き受けることをお勧めします Microsoft Roboticsフォーラム そして、ジョージ・クリサンタコプロス(または他のCCR脳の1つ)を取得して、問題が何であるかを伝えることができるかどうかを確認します。しかし、私はあなたのコードが現状の状態であると推測することができます。
ポートから「ポップ」アイテムを扱っている方法は非常に非効率的です。本質的には、ポートにメッセージがあるたびにイテレーターが目が覚め、1つのメッセージだけを扱っています(ポートにさらに数百があるかもしれないという事実にもかかわらず)。 yield
制御がフレームワークに渡されます。降伏したレシーバーがイテレーターの別の「目覚め」を引き起こす時点で、多くのメッセージがポートを埋めています。ディスパッチャーからスレッドを引っ張って1つのアイテムのみを扱う(その間に多くの人が積み上げられた場合)、ほぼ間違いなく良いスループットを得るための最良の方法ではありません。
私はあなたのコードを改造しましたので、降伏後、ポートをチェックして、さらにメッセージがキューに登録されているかどうかを確認し、フレームワークに戻す前にポートを完全に空にします。また、使用するコードをややリファクトしました CcrServiceBase
これは、あなたがしているいくつかのタスクの構文を簡素化します。
internal class Test:CcrServiceBase
{
private readonly Port<int> intPort = new Port<int>();
private Timer timer;
public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
new Dispatcher(0,
"dispatcher")))
{
}
public void StartTest() {
SpawnIterator(ProcessInts);
var counter = 0;
timer = new Timer(x =>
{
for (var i = 0; i < 1500; ++i)
intPort.Post(counter++);
}
,
null,
0,
1000);
}
public IEnumerator<ITask> ProcessInts()
{
while (true)
{
yield return intPort.Receive();
int currentValue = intPort;
ReportCurrent(currentValue);
while(intPort.Test(out currentValue))
{
ReportCurrent(currentValue);
}
}
}
private void ReportCurrent(int currentValue)
{
if (currentValue % 1000 == 0)
{
Console.WriteLine("{0}, Current Items In Queue:{1}",
currentValue,
intPort.ItemCount);
}
}
}
あるいは、例ではあまりよく使用されていないので、Iteratorを完全に廃止することもできます(これが処理の順序にどのような影響を与えるかは完全にはわかりませんが):
internal class Test : CcrServiceBase
{
private readonly Port<int> intPort = new Port<int>();
private Timer timer;
public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
new Dispatcher(0,
"dispatcher")))
{
}
public void StartTest()
{
Activate(
Arbiter.Receive(true,
intPort,
i =>
{
ReportCurrent(i);
int currentValue;
while (intPort.Test(out currentValue))
{
ReportCurrent(currentValue);
}
}));
var counter = 0;
timer = new Timer(x =>
{
for (var i = 0; i < 500000; ++i)
{
intPort.Post(counter++);
}
}
,
null,
0,
1000);
}
private void ReportCurrent(int currentValue)
{
if (currentValue % 1000000 == 0)
{
Console.WriteLine("{0}, Current Items In Queue:{1}",
currentValue,
intPort.ItemCount);
}
}
}
これらの例はどちらも、スループットが数桁大幅に増加します。お役に立てれば。