Como este aplicativo Java está executando vários threads sem estender threads ou implementar executáveis?
-
13-12-2019 - |
Pergunta
Estou aprendendo Java e consegui fazer um pouco de multithreading com meus aplicativos existentes usando executável.Agora eu estava olhando para o disruptor (para compartilhar variáveis entre threads), mas não consigo descobrir como o autor está realmente gerando threads.
Vejo que ele está usando o Executor, que eu uso para enviar classes executáveis em meu programa, mas neste exemplo não há envio (ou executável).Eu só aprendi o que sei nos tutoriais do Oracle e eles mencionam que as únicas duas maneiras são estender threads ou implementar executáveis (não vejo nenhum dos dois aqui, mas ele envia o executor ao disruptor, que talvez seja como ele está encadeando?).Estou faltando alguma coisa ou essa pessoa está fazendo isso de uma maneira diferente?Meu objetivo final é entender esse código (que funciona perfeitamente) para que eu possa aplicá-lo ao meu código existente (usando executável).
Aqui está o código em questão:
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class App {
private final static int RING_SIZE = 1024 * 8;
private static long handleCount = 0;
private final static long ITERATIONS = 1000L * 1000L * 300L;
private final static int NUM_EVENT_PROCESSORS = 8;
private final static EventHandler<ValueEvent> handler =
new EventHandler<ValueEvent>() {
public void onEvent(final ValueEvent event,
final long sequence,
final boolean endOfBatch) throws Exception {
handleCount++;
}
};
public static void main(String[] args) {
System.out.println("Starting disruptor app.");
ExecutorService executor = Executors.newFixedThreadPool(NUM_EVENT_PROCESSORS);
Disruptor<ValueEvent> disruptor =
new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, executor,
new SingleThreadedClaimStrategy(RING_SIZE),
new SleepingWaitStrategy());
disruptor.handleEventsWith(handler);
RingBuffer<ValueEvent> ringBuffer = disruptor.start();
long start = System.currentTimeMillis();
long sequence;
ValueEvent event;
for (long x=0; x<ITERATIONS; x++) {
sequence = ringBuffer.next();
event = ringBuffer.get(sequence);
event.setValue(x);
ringBuffer.publish(sequence);
}
final long expectedSequence = ringBuffer.getCursor();
while (handleCount < expectedSequence) { }
long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
System.out.printf("op/s: %d, handled: %d", opsPerSecond, handleCount);
}
}
Atualizar:se o Disruptor estiver lidando com a geração de threads, como posso enviar minha classe executável existente para ele?ou preciso retrabalhar o código novamente?Desculpe, estou um pouco confuso sobre se o disruptor vai funcionar com o código existente ou se preciso mudar completamente minhas coisas para ele.
Solução
Como você suspeita, o tratamento real dos threads (por meio do envio de itens de trabalho) é feito dentro Disruptor
.Então você precisa olhar seu código fonte (para sua sorte, é de código aberto), para encontrar isto:
public RingBuffer<T> start()
{
EventProcessor[] gatingProcessors = eventProcessorRepository.getLastEventProcessorsInChain();
ringBuffer.setGatingSequences(Util.getSequencesFor(gatingProcessors));
checkOnlyStartedOnce();
for (EventProcessorInfo<T> eventProcessorInfo : eventProcessorRepository)
{
executor.execute(eventProcessorInfo.getEventProcessor());
}
return ringBuffer;
}