このJavaアプリはスレッドを拡張したり、Runnableを実装せずに複数のスレッドを実行していますか?

StackOverflow https://stackoverflow.com//questions/9688799

質問

私はJavaを学んでいて、runnableを使って既存のアプリで少しマルチスレッドをすることができました。私は今、乱流を見ている(スレッド間の変数を共有するために)、著者が実際にスレッドを産む方法を理解することはできませんでした。

私は彼が私のプログラムで実行可能クラスを送信するために使用するexecutorを使用しているのを見ますが、この例では送信(またはrunnable)はありません。私はOracleのチュートリアルから知っていることだけを学び、スレッドを拡張するか、Runnableを実装することがわずか2つの方法に言及しています(ここでは表示されませんが、彼がスレッドをスレッティングするのはどんなに使い果たしますか?)。私は何かが足りない、またはこの人は別の方法でそれをしていますか?私の最終目標は、このコード(完全に機能する)を理解することです。既存の(Runnableを使用)コードに適用できます。

これは問題のコードです:

app.java

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.*;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class App {

    private final static int RING_SIZE = 1024 * 8;

    private static long handleCount = 0;

    private final static long ITERATIONS = 1000L * 1000L * 300L;
    private final static int NUM_EVENT_PROCESSORS = 8;

    private final static EventHandler<ValueEvent> handler =
        new EventHandler<ValueEvent>() {
        public void onEvent(final ValueEvent event,
                                final long sequence,
                                final boolean endOfBatch) throws Exception {
        handleCount++;
    }
    };

    public static void main(String[] args) {
    System.out.println("Starting disruptor app.");

    ExecutorService executor = Executors.newFixedThreadPool(NUM_EVENT_PROCESSORS);

    Disruptor<ValueEvent> disruptor =
        new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, executor,
            new SingleThreadedClaimStrategy(RING_SIZE),
            new SleepingWaitStrategy());
    disruptor.handleEventsWith(handler);
    RingBuffer<ValueEvent> ringBuffer = disruptor.start();

    long start = System.currentTimeMillis();

        long sequence;
        ValueEvent event;
    for (long x=0; x<ITERATIONS; x++) {
        sequence = ringBuffer.next();
        event = ringBuffer.get(sequence);
        event.setValue(x);
        ringBuffer.publish(sequence);
        }
    final long expectedSequence = ringBuffer.getCursor();

    while (handleCount < expectedSequence) { }

    long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
    System.out.printf("op/s: %d, handled: %d", opsPerSecond, handleCount);
    }
}
.

UPDATE:乱流がスレッドの産卵を処理している場合は、既存のRunnableクラスをそれに送信できますか?それとも、コードを再度やり直す必要がありますか?すみません、私は中断が既存のコードを扱うことになっている場合、またはそれのために私のものを完全に変更する必要があるならば、私は少し混乱しています。

役に立ちましたか?

解決

あなたが疑われるように、(作業項目の提出を介して)スレッドを扱う実際には、Disruptorの中で行われます。だからあなたはそのソースコード(あなたの運にはオープンソースです)、これを見つけるために:

public RingBuffer<T> start()
{
    EventProcessor[] gatingProcessors = eventProcessorRepository.getLastEventProcessorsInChain();
    ringBuffer.setGatingSequences(Util.getSequencesFor(gatingProcessors));

    checkOnlyStartedOnce();
    for (EventProcessorInfo<T> eventProcessorInfo : eventProcessorRepository)
    {
        executor.execute(eventProcessorInfo.getEventProcessor());
    }

    return ringBuffer;
}
.

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top