質問

私は使用しています java.util.concurrent.blockingqueue 非常にシンプルなプロデューサーと消費者のシナリオで。たとえば、この擬似コードは、消費者の部分を描写しています。

class QueueConsumer implements Runnable {

    @Override
    public void run() {
        while(true)
        {
            try {
                ComplexObject complexObject = myBlockingQueue.take();
                //do something with the complex object
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

ここまでは順調ですね。ブロッキングキューのJavadocで私が読んだ:

BlockingQueueは、いかなる種類の「閉鎖」または「シャットダウン」操作をサポートしていないため、これ以上のアイテムが追加されないことを示します。このような機能のニーズと使用は、実装に依存する傾向があります。たとえば、一般的な戦術は、生産者が消費者が撮影したときにそれに応じて解釈される特別なストリームや毒物を挿入することです。

残念ながら、使用中のジェネリックとComplexObjectの性質のために、「毒物」をキューに押し込むのは些細なことではありません。したがって、この「共通の戦術」は、私のシナリオでは本当に便利ではありません。

私の質問は、キューを「閉じる」ために他にどのような良い戦術/パターンを使用できるかということです。

ありがとうございました!

役に立ちましたか?

解決

消費者スレッドのハンドルがある場合は、それを中断することができます。あなたが与えたコードで、それは消費者を殺します。プロデューサーがこれを持っているとは思わないでしょう。おそらく、プログラムコントローラーにコールバックして、それが完了していることを知らせる必要があります。その後、コントローラーは消費者スレッドを中断します。

割り込みに従う前に、いつでも仕事を終えることができます。例えば:

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().isInterrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                this.process(complexObject);

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }

        // Thread is getting ready to die, but first,
        // drain remaining elements on the queue and process them.
        final LinkedList<ComplexObject> remainingObjects;
        myBlockingQueue.drainTo(remainingObjects);
        for(ComplexObject complexObject : remainingObjects) {
            this.process(complexObject);
        }
    }

    private void process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

とにかく、どうにかしてキューを中毒にすることを実際に好むでしょう。スレッドを殺したい場合は、スレッドに自分自身を殺すように頼みます。

(誰かが扱っているのを見るのはいいことです InterruptedException ちゃんと。)


ここで中断の取り扱いについて何らかの主張があるようです。まず、みんなにこの記事を読んでもらいたい: http://www.ibm.com/developerworks/java/library/j-jtp05236.html

さて、誰も実際にそれを読んでいないという理解で、ここに取引があります。スレッドはAnのみを受信します InterruptedException 割り込み時に現在ブロックしていた場合。この場合、 Thread.interrupted() 戻ります false. 。ブロックしていなかった場合、この例外は受け取らず、代わりに Thread.interrupted() 戻ります true. 。したがって、ループガードは絶対に、何があっても、チェックする必要があります Thread.interrupted(), 、または、スレッドの中断を欠く危険があります。

だから、あなたがチェックしているので Thread.interrupted() 何があっても、あなたは捕まえることを余儀なくされています InterruptedException (そして、あなたが強制されていなくてもそれを扱うはずです)、あなたは同じイベントを処理する2つのコード領域、スレッドの中断があります。これを処理する1つの方法は、それらを1つの条件に正規化することです。つまり、ブール状態チェックが例外をスローできるか、例外がブール状態を設定できることを意味します。後で選びます。


編集: 静的スレッド#中断されたメソッドに注意してください クリア 現在のスレッドの中断されたステータス。

他のヒント

これをシンプルにするための別のアイデア:

class ComplexObject implements QueueableComplexObject
{
    /* the meat of your complex object is here as before, just need to
     * add the following line and the "implements" clause above
     */
    @Override public ComplexObject asComplexObject() { return this; }
}

enum NullComplexObject implements QueueableComplexObject
{
    INSTANCE;

    @Override public ComplexObject asComplexObject() { return null; }
}

interface QueueableComplexObject
{
    public ComplexObject asComplexObject();
}

次に、使用します BlockingQueue<QueueableComplexObject> キューとして。キューの処理を終了したいときは、 queue.offer(NullComplexObject.INSTANCE). 。消費者の面で、そうします

boolean ok = true;
while (ok)
{
    ComplexObject obj = queue.take().asComplexObject();
    if (obj == null)
        ok = false;
    else
        process(obj);
}

/* interrupt handling elided: implement this as you see fit,
 * depending on whether you watch to swallow interrupts or propagate them
 * as in your original post
 */

いいえ instanceof 必須、そしてあなたは偽物を構築する必要はありません ComplexObject 実装に応じて、これは高価/困難な場合があります。

別の方法は、あなたがしている処理をで包むことです ExecutorService, 、そして、 ExecutorService それ自体は、ジョブがキューに追加されるかどうかを制御します。

基本的に、あなたは利用します ExecutorService.shutdown(), 、呼び出されたとき、それ以上のタスクが執行者によって処理されることからです。

現在どのようにタスクを提出しているのかわかりません QueueConsumer あなたの例で。私はあなたがある種のものを持っていると仮定しました submit() 方法、および例で同様のメソッドを使用しました。

import java.util.concurrent.*;

class QueueConsumer {
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public void shutdown() {
        executor.shutdown(); // gracefully shuts down the executor
    }

    // 'Result' is a class you'll have to write yourself, if you want.
    // If you don't need to provide a result, you can just Runnable
    // instead of Callable.
    public Future<Result> submit(final ComplexObject complexObject) {
        if(executor.isShutdown()) {
            // handle submitted tasks after the executor has been told to shutdown
        }

        return executor.submit(new Callable<Result>() {
            @Override
            public Result call() {
                return process(complexObject);
            }
        });
    }

    private Result process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

この例は、何が何であるかのカフのオフカフのイラストです java.util.concurrent パッケージオファー。おそらくそれに加えられるいくつかの最適化があります(例: QueueConsumer それ自身のクラスはおそらく必要ではないので。あなたはただ提供することができます ExecutorService プロデューサーがタスクを提出しているものでは)。

を掘ります java.util.concurrent パッケージ(上記のリンクの一部から始まります)。あなたはあなたがやろうとしていることのための多くの素晴らしいオプションを提供することに気付くかもしれません、そして、あなたは作業キューの規制を心配する必要さえありません。

毒物を作る別の可能性:それをクラスの特定のインスタンスにする。このようにして、サブタイプの周りを悩ませたり、ジェネリックを台無しにしたりする必要はありません。

欠点:生産者と消費者の間に何らかのシリアル化障壁がある場合、これは機能しません。

public class ComplexObject
{
    public static final POISON_INSTANCE = new ComplexObject();

    public ComplexObject(whatever arguments) {
    }

    // Empty constructor for creating poison instance.
    private ComplexObject() {
    }
}

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().interrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                if (complexObject == ComplexObject.POISON_INSTANCE)
                    return;

                // Process complex object.

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }
    }
}

ComplexObjectを拡張して、非自明の作成機能を模倣することは可能ですか?基本的にあなたはシェルオブジェクトになってしまいますが、あなたはそれを行うことができます instance of キューオブジェクトの終了かどうかを確認します。

一般的なオブジェクトをdataObjectに包むことができます。このDataObjectでは、Poisonオブジェクトのステータスなどの追加データを追加できます。 DataObjectは、2つのフィールドを持つクラスです。 T complexObject;boolean poison;.

消費者はキューからデータオブジェクトを取得します。毒物が返された場合、消費者を閉じます。そうしないと、ジェネリックを解き、「プロセス(ComplexObject)」と呼びます。

私はaを使用しています java.util.concurrent.LinkedBlockingDeque<E> キューの最後にオブジェクトを追加して、正面からそれらを取ることができるようにします。そうすれば、オブジェクトは順番に処理されますが、より重要なのは、ポイズンオブジェクトに遭遇した後、キューを閉じることが安全です。

複数の消費者をサポートするために、私はそれに出くわしたときにキューにポイズンオブジェクトを戻します。

public final class Data<T> {
    private boolean poison = false;
    private T complexObject;

    public Data() {
        this.poison = true;
    }

    public Data(T complexObject) {
        this.complexObject = complexObject;
    }

    public boolean isPoison() {
        return poison;
    }

    public T getComplexObject() {
        return complexObject;
    }
}
public class Consumer <T> implements Runnable {

    @Override
    public final void run() {
        Data<T> data;
        try {
            while (!(data = queue.takeFirst()).isPoison()) {
                process(data.getComplexObject());
            }
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        // add the poison object back so other consumers can stop too.
        queue.addLast(line);
    }
}

密接なものを実装することは私にとって合理的だと思われます BlockingQueue:

import java.util.concurrent.BlockingQueue;

public interface CloseableBlockingQueue<E> extends BlockingQueue<E> {
    /** Returns <tt>true</tt> if this queue is closed, <tt>false</tt> otherwise. */
    public boolean isClosed();

    /** Closes this queue; elements cannot be added to a closed queue. **/
    public void close();
}

次の動作でこれを実装するのは非常に簡単です(cf。 方法概要表):

編集してこれを行いました ソース, 、それを見つけてください github.com.

この状況では、一般にジェネリックを捨てて、キューホールドタイプオブジェクトを作成する必要があります。次に、実際のタイプにキャストする前に、「毒」オブジェクトをチェックするだけです。

私はこのシステムを使用しました:

ConsumerClass
private boolean queueIsNotEmpty = true;//with setter
...
do {
    ...
    sharedQueue.drainTo(docs);
    ...
} while (queueIsNotEmpty || sharedQueue.isEmpty());

プロデューサーが終了するとき、私はConsumerObject、QueueisNotemptyフィールドにfalseに設定しました

今日、私はラッパーオブジェクトを使用してこの問題を解決しました。 ComplexObjectは複雑すぎてサブクラスには複雑すぎるため、ComplexObjectをComplexObjectWrapperオブジェクトにラップしました。次に、complexObjectWrapperを一般的なタイプとして使用しました。

public class ComplexObjectWrapper {
ComplexObject obj;
}

public class EndOfQueue extends ComplexObjectWrapper{}

今の代わりに BlockingQueue<ComplexObject> やった BlockingQueue<ComplexObjectWrapper>

私は消費者と生産者の両方を管理していたので、このソリューションは私のために働きました。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top