Pregunta

Estoy jugando con el Disruptor Marco, y estoy descubriendo que mis manejadores de eventos no están siendo invocados.

Aquí está mi código de configuración:

private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService  EXECUTOR = Executors.newSingleThreadExecutor();

private void initializeDisruptor() {
    if (disruptor != null)
        return;

    disruptor = 
            new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
                    new SingleThreadedClaimStrategy(BUFFER_SIZE),
                    new SleepingWaitStrategy());
    disruptor.handleEventsWith(searchTermMatchingHandler)
        .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);

    this.ringBuffer = disruptor.start();
}

En otro lugar, publico eventos. He probado cada uno de los siguientes dos enfoques:

Enfoque de publicación de eventos A:

private void handleStatus(final Status status)
{

    long sequence = ringBuffer.next();
    TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
    event.setStatus(status);
    event.setSearchInstruments(searchInstruments);
    ringBuffer.publish(sequence);
}

En este escenario, encuentro el primero EventHandler se invoca, pero nunca nada más allá de eso.

Enfoque de publicación de eventos B:

private void handleStatus(final Status status)
{
    disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {

        @Override
        public TwitterStatusReceivedEvent translateTo(
                TwitterStatusReceivedEvent event, long sequence) {
            event.setStatus(status);
            event.setSearchInstruments(searchInstruments);
            return event;
        }
    });
}

En este escenario, encuentro que ninguno de los manejadores de eventos se invoca en absoluto.

¿Qué estoy haciendo mal?

Actualizar

Aquí está mi EventHandler en su totalidad. ¿Cómo debo señalar que el procesamiento está completo?

public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {

    @Override
    public void onEvent(TwitterStatusReceivedEvent event, long sequence,
            boolean endOfBatch) throws Exception {
        String statusText = event.getStatus().getText();
        for (Instrument instrument : event.getSearchInstruments())
        {
            if (statusText.contains(instrument.getSearchTerm()))
            {
                event.setMatchedInstrument(instrument);
                break;
            }
        }
    }

}
¿Fue útil?

Solución

Cada controlador de eventos debe ejecutarse en su propio hilo que no salga hasta que apague el disruptor. Dado que está utilizando un solo ejecutor roscado, solo el primer controlador de eventos que se ejecuta se ejecutará. (La clase de disruptor almacena cada manejador en un hashmap, por lo que el manejador se extenderá variará)

Si cambia a un CachedThreadPool, debería encontrar que todo comienza a funcionar. No necesitará hacer ninguna gestión de los números de secuencia porque todo está manejado por el PROCESOR EVENTO que la clase Disruptor establece y administra para usted. Solo procesar cada evento que obtenga es exactamente correcto.

Otros consejos

Debe asegurarse de que su SearchMatchingHandler esté actualizando su número de secuencia después de procesar el evento. EventHandlers más aguas abajo (AppendStatushandler, UpdatePriceHandler, PersistUpdatesHandler) inspeccionará el número de secuencia SearchTermMatchingHandler para ver qué eventos pueden recoger el búfer del anillo.

Tuve el mismo problema, pero fue porque estaba instanciando el disruptor usando Spring (Configa Java) y estaba instanciando al albacea en el mismo método de @Bean que el disruptor.

Arreglé el problema instanciando al ejecutor en un método separado @Bean.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top