Frage

Ich spiele mit dem herum Disruptor Framework und stelle fest, dass meine Event -Handler nicht aufgerufen werden.

Hier ist mein Setup -Code:

private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService  EXECUTOR = Executors.newSingleThreadExecutor();

private void initializeDisruptor() {
    if (disruptor != null)
        return;

    disruptor = 
            new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
                    new SingleThreadedClaimStrategy(BUFFER_SIZE),
                    new SleepingWaitStrategy());
    disruptor.handleEventsWith(searchTermMatchingHandler)
        .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);

    this.ringBuffer = disruptor.start();
}

An anderer Stelle veröffentliche ich Veranstaltungen. Ich habe jeden der folgenden zwei Ansätze ausprobiert:

Event Publishing Ansatz a:

private void handleStatus(final Status status)
{

    long sequence = ringBuffer.next();
    TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
    event.setStatus(status);
    event.setSearchInstruments(searchInstruments);
    ringBuffer.publish(sequence);
}

In diesem Szenario finde ich das erste EventHandler wird angerufen, aber nie etwas darüber hinaus.

Event Publishing -Ansatz B:

private void handleStatus(final Status status)
{
    disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {

        @Override
        public TwitterStatusReceivedEvent translateTo(
                TwitterStatusReceivedEvent event, long sequence) {
            event.setStatus(status);
            event.setSearchInstruments(searchInstruments);
            return event;
        }
    });
}

In diesem Szenario finde ich, dass keiner der Event -Handler überhaupt aufgerufen wird.

Was mache ich falsch?

Aktualisieren

Hier ist mein EventHandler in seiner Gesamtheit. Wie soll ich signalisieren, dass die Verarbeitung vollständig ist?

public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {

    @Override
    public void onEvent(TwitterStatusReceivedEvent event, long sequence,
            boolean endOfBatch) throws Exception {
        String statusText = event.getStatus().getText();
        for (Instrument instrument : event.getSearchInstruments())
        {
            if (statusText.contains(instrument.getSearchTerm()))
            {
                event.setMatchedInstrument(instrument);
                break;
            }
        }
    }

}
War es hilfreich?

Lösung

Jeder Ereignishandler muss in seinem eigenen Thread ausgeführt werden, der nicht beendet wird, bis Sie den Disruptor heruntergefahren haben. Da Sie einen einzelnen Thread -Executor verwenden, wird nur der erste Event -Handler, der ausgeführt wird, jemals ausgeführt. (Die Disruptor -Klasse speichert jeden Handler in einem HashMap, so dass der Handler läuft.)

Wenn Sie zu einem CachedThreadpool wechseln, sollten Sie feststellen, dass alles läuft. Sie müssen keine Verwaltung der Sequenznummern durchführen, da dies alles von dem EventProcessor behandelt wird, den die Disruptor -Klasse für Sie einsetzt und für Sie verwaltet. Die Bearbeitung jedes Ereignisses, das Sie erhalten, ist genau richtig.

Andere Tipps

Sie müssen sicherstellen, dass Ihr SuchtermMatchingHandler seine Sequenznummer nach dem Verarbeitung des Ereignisses aktualisiert. Die EventHandlers weiter stromabwärts (AppendStatShandler, UpdatePriceHandler, PersistupdatesHandler) werden die SuchtermMatchingHandler -Sequenznummer inspizieren, um zu sehen, welche Ereignisse sie vom Ringpuffer abholen können.

Ich hatte das gleiche Problem, aber es lag daran, dass ich den Disruptor mit Spring (Java -Konfiguration) instanziierte und den Testamentsvollstrecker in derselben @Bean -Methode wie der Disruptor instanziierte.

Ich habe das Problem behoben, indem ich den Testamentsvollstrecker in einer separaten @Bean -Methode instanziierte.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top