java BlockingQueue にはブロッキング ピークがありませんか?
-
20-09-2019 - |
質問
オブジェクトのブロックキューがあります。
キューにオブジェクトが存在するまでブロックするスレッドを作成したいと考えています。BlockingQueue.take() によって提供される機能と同様です。
ただし、オブジェクトを正常に処理できるかどうかわからないため、オブジェクトを削除せずに、peek()だけを実行したいと思います。オブジェクトを正常に処理できる場合にのみオブジェクトを削除したいと考えています。
したがって、ブロック Peak() 関数が必要です。現在、peek() は、Javadoc に従ってキューが空の場合にのみ戻ります。
何かが足りないのでしょうか?この機能を実現する別の方法はありますか?
編集:
スレッドセーフキューを使用して、代わりにピークしてスリープした場合について何か考えはありますか?
public void run() {
while (!exit) {
while (queue.size() != 0) {
Object o = queue.peek();
if (o != null) {
if (consume(o) == true) {
queue.remove();
} else {
Thread.sleep(10000); //need to backoff (60s) and try again
}
}
}
Thread.sleep(1000); //wait 1s for object on queue
}
}
コンシューマー スレッドと (別個の) プロデューサー スレッドが 1 つだけあることに注意してください。これは BlockingQueue を使用するほど効率的ではないと思います...コメントをお待ちしております。
解決
あなたが使用することができ、A LinkedBlockingDeque と物理的に(takeLast()
を使用して)キューから項目を削除しますが、処理がputLast(E e)
を使用して失敗した場合は、キュー強いのの最後>で再びそれを置き換えます。一方、あなたの「プロデューサーは」のフロントputFirst(E e)
を使用して、キューののに要素を追加することになります。
あなたは常にあなた自身のQueue
実装内でこの動作をカプセル化して行い、基礎となるblockingPeek()
の舞台裏takeLast()
続いputLast()
ことLinkedBlockingDeque
方法を提供することができます。したがって、呼び出し元クライアントの観点からの要素があなたのキューから削除されることはありません。
他のヒント
私は成功したオブジェクトを処理することができるようになります場合、私は知らないので、しかし、私はちょうどPEEK()とではないオブジェクトを削除したいです。私は正常に処理できていた場合にのみ、オブジェクトを削除します。
一般的には、スレッドセーフではありません。何を、どうかをpeek()
と
take()
前に、別のスレッドがそのオブジェクトを取ることを決定した後、 私がこれを行うことを知っている唯一のことは、 ブロッキングバッファ で Apache Commons コレクション:
GETまたは削除のいずれかが空のバッファーで呼び出された場合、通話スレッドは、追加または追加操作が完了したという通知を待ちます。
get()
と同等です peek()
, 、そして Buffer
のように動作させることができます BlockingQueue
飾ることで 無制限の FifoBuffer とともに BlockingBuffer
あなたはまた、単に何かが(ブロッキング)キューに追加されたときに、あなたのリスナーにオフイベントを送信し、あなたのブロッキングキューにイベントリスナーキューを追加してもらえますか?メソッドが呼び出されたのactionPerformedれるまであなたのスレッドのブロックを持っている可能性があります。
簡単な答えは、ブロッキング ピークを実際に使用する方法はなく、ブロッキング ピーク() を使用してブロッキング キューを自分で実装すること以外にはありません。
何かが足りないのでしょうか?
Peak() は同時実行で問題が発生する可能性があります -
- Peak() されたメッセージを処理できない場合、複数のコンシューマがない限り、メッセージはキューに残されます。
- あなたがオブジェクトを処理できない場合、誰がそのオブジェクトをキューから取り出すのでしょうか?
- 複数のコンシューマがある場合、peek() を実行しているスレッドと項目を処理している別のスレッドとの間で競合状態が発生し、結果として処理が重複したり、さらに悪いことが発生したりします。
実際にアイテムを削除して、責任連鎖パターン
編集:再:あなたの最後の例:コンシューマが 1 つしかない場合、その間に更新されない限り、キュー上のオブジェクトを削除することはできません。その場合、スレッドの安全性について非常に注意する必要があり、アイテムを配置すべきではなかったでしょう。とにかく行列に並んでいます。
のBlockingQueueそのもののように見えますが、あなたはしているが、指定した機能を持っていません。
あなたがいない「が正しくプロセス」できるオブジェクトで何をしますか:私は少しものの、問題の再フレームに試してみてください?あなただけのキューにそれらを残している場合は、いくつかの点でそれらを引き出し、それらに対処する必要があります。私はそれらを処理する方法を考え出すのいずれかreccommendと思いますか異なるデータ構造以外を選択する(queue.get()が無効または不正な値の任意の並べ替えを与える場合には、一般的に、あなただけの床の上にドロップしてOKでしょうね) FIFOます。
未解答自体、しかし: JDK-6653412 の主張、これを有効なユースケースではありません。
「最も単純な」解決策
加工しないでください 次 までの要素 前の 要素は正常に処理されました。
public void run() {
Object lastSuccessfullyProcessedElement = null;
while (!exit) {
Object obj = lastSuccessfullyProcessedElement == null ? queue.take() : lastSuccessfullyProcessedElement; // blocking
boolean successful = process(obj);
if(!successful) {
lastSuccessfullyProcessedElement = obj;
} else {
lastSuccessfullyProcessedElement = null;
}
}
}
- 電話をかける
peek()
また、値が null かどうかを確認するのは CPU 効率が良くありません。
次のプログラムのキューが空の場合、システムの CPU 使用率が 10% になることが確認されました。
while (true) {
Object o = queue.peek();
if(o == null) continue;
// omitted for the sake of brevity
}
追加
sleep()
遅さが加わります。を使用してキューに追加し直します
putLast
秩序を乱すことになる。さらに、これはロックを必要とするブロック操作です。