这个java应用程序如何在不扩展线程或实现runnable的情况下运行多个线程?
-
13-12-2019 - |
题
我正在学习Java,并且能够使用runnable对我现有的应用程序进行一些多线程处理。我现在正在查看disruptor(在线程之间共享变量),但我无法弄清楚作者实际上是如何产生线程的。
我看到他正在使用Executor,我用它在我的程序中提交runnable类,但在这个例子中没有提交(或runnable)。我只从Oracle教程中学到了我所知道的,他们提到了唯一的两种方法是扩展线程或实现runnable(我在这里也没有看到,但他确实将executor提交给disruptor,这可能是他的线程).我错过了什么,还是这个人以不同的方式做?我的最终目标是理解这个代码(它完美地工作),所以我可以将它应用到我现有的(使用runnable)代码中。
这是有问题的代码:
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class App {
private final static int RING_SIZE = 1024 * 8;
private static long handleCount = 0;
private final static long ITERATIONS = 1000L * 1000L * 300L;
private final static int NUM_EVENT_PROCESSORS = 8;
private final static EventHandler<ValueEvent> handler =
new EventHandler<ValueEvent>() {
public void onEvent(final ValueEvent event,
final long sequence,
final boolean endOfBatch) throws Exception {
handleCount++;
}
};
public static void main(String[] args) {
System.out.println("Starting disruptor app.");
ExecutorService executor = Executors.newFixedThreadPool(NUM_EVENT_PROCESSORS);
Disruptor<ValueEvent> disruptor =
new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, executor,
new SingleThreadedClaimStrategy(RING_SIZE),
new SleepingWaitStrategy());
disruptor.handleEventsWith(handler);
RingBuffer<ValueEvent> ringBuffer = disruptor.start();
long start = System.currentTimeMillis();
long sequence;
ValueEvent event;
for (long x=0; x<ITERATIONS; x++) {
sequence = ringBuffer.next();
event = ringBuffer.get(sequence);
event.setValue(x);
ringBuffer.publish(sequence);
}
final long expectedSequence = ringBuffer.getCursor();
while (handleCount < expectedSequence) { }
long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
System.out.printf("op/s: %d, handled: %d", opsPerSecond, handleCount);
}
}
更新资料:如果Disruptor正在处理线程的产生,那么如何将现有的runnable类提交给它?还是我需要重新编写代码?对不起,我有点困惑,如果disruptor将与现有的代码一起工作,或者如果我需要完全改变我的东西。
解决方案
正如您所怀疑的那样,实际处理线程(通过提交工作项)是在内部完成的 Disruptor
.所以你需要看看 其源代码 (幸运的是,它是开源的),找到这个:
public RingBuffer<T> start()
{
EventProcessor[] gatingProcessors = eventProcessorRepository.getLastEventProcessorsInChain();
ringBuffer.setGatingSequences(Util.getSequencesFor(gatingProcessors));
checkOnlyStartedOnce();
for (EventProcessorInfo<T> eventProcessorInfo : eventProcessorRepository)
{
executor.execute(eventProcessorInfo.getEventProcessor());
}
return ringBuffer;
}
不隶属于 StackOverflow