Come è questa app Java eseguendo più fili senza estendere i fili o l'implementazione correggiabile?

StackOverflow https://stackoverflow.com//questions/9688799

Domanda

Sto imparando Java ed è stato in grado di fare un po 'multi-threading con le mie app esistenti usando Runnable. Ora stavo guardando il disturbo (per condividere le variabili tra i fili), ma non riesco a capire come l'autore in realtà è in realtà i fili.

Vedo che sta usando Executor, che uso per inviare classi declavoli nel mio programma ma in questo esempio non c'è alcuna invia (o runnable). Ho imparato solo quello che conosco dai tutorial Oracle e menzionano gli unici due modi per estendere i fili o implementare i fili o implementare runnable (non vedo qui, ma invia un esecutore di interrompere, che forse è il threading?). Mi manca qualcosa o questa persona lo sta facendo in modo diverso? Il mio obiettivo finale è capire questo codice (che funziona perfettamente) così posso applicarlo al mio codice esistente (usando Runnable).

Ecco il codice in questione:

app.Java

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.*;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class App {

    private final static int RING_SIZE = 1024 * 8;

    private static long handleCount = 0;

    private final static long ITERATIONS = 1000L * 1000L * 300L;
    private final static int NUM_EVENT_PROCESSORS = 8;

    private final static EventHandler<ValueEvent> handler =
        new EventHandler<ValueEvent>() {
        public void onEvent(final ValueEvent event,
                                final long sequence,
                                final boolean endOfBatch) throws Exception {
        handleCount++;
    }
    };

    public static void main(String[] args) {
    System.out.println("Starting disruptor app.");

    ExecutorService executor = Executors.newFixedThreadPool(NUM_EVENT_PROCESSORS);

    Disruptor<ValueEvent> disruptor =
        new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, executor,
            new SingleThreadedClaimStrategy(RING_SIZE),
            new SleepingWaitStrategy());
    disruptor.handleEventsWith(handler);
    RingBuffer<ValueEvent> ringBuffer = disruptor.start();

    long start = System.currentTimeMillis();

        long sequence;
        ValueEvent event;
    for (long x=0; x<ITERATIONS; x++) {
        sequence = ringBuffer.next();
        event = ringBuffer.get(sequence);
        event.setValue(x);
        ringBuffer.publish(sequence);
        }
    final long expectedSequence = ringBuffer.getCursor();

    while (handleCount < expectedSequence) { }

    long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
    System.out.printf("op/s: %d, handled: %d", opsPerSecond, handleCount);
    }
}
.

Aggiornamento: se l'interruttore sta gestendo la deposizione delle filettature, come posso inviare la mia classe runnable esistente ad esso? O ho bisogno di rielaborare di nuovo il codice? Scusa, sono un po 'confuso se l'interruttore funzionerà con il codice esistente o se ho bisogno di cambiare completamente le mie cose per questo.

È stato utile?

Soluzione

Come si sospetta, l'effettivo trattare con i thread (tramite l'invio di elementi di lavoro) viene eseguito all'interno del Disruptor.Quindi è necessario guardare a Il suo codice sorgente (alla tua fortuna, è open source), per trovare questo:

public RingBuffer<T> start()
{
    EventProcessor[] gatingProcessors = eventProcessorRepository.getLastEventProcessorsInChain();
    ringBuffer.setGatingSequences(Util.getSequencesFor(gatingProcessors));

    checkOnlyStartedOnce();
    for (EventProcessorInfo<T> eventProcessorInfo : eventProcessorRepository)
    {
        executor.execute(eventProcessorInfo.getEventProcessor());
    }

    return ringBuffer;
}
.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top