Вопрос

Я играю с Разрушитель Framework, и я обнаруживаю, что мои обработчики событий не вызывают.

Вот мой код настройки:

private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService  EXECUTOR = Executors.newSingleThreadExecutor();

private void initializeDisruptor() {
    if (disruptor != null)
        return;

    disruptor = 
            new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
                    new SingleThreadedClaimStrategy(BUFFER_SIZE),
                    new SleepingWaitStrategy());
    disruptor.handleEventsWith(searchTermMatchingHandler)
        .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);

    this.ringBuffer = disruptor.start();
}

В другом месте я публикую мероприятия. Я пробовал каждый из следующих двух подходов:

Подход публикации событий a:

private void handleStatus(final Status status)
{

    long sequence = ringBuffer.next();
    TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
    event.setStatus(status);
    event.setSearchInstruments(searchInstruments);
    ringBuffer.publish(sequence);
}

В этом сценарии я нахожу первый EventHandler вызывается, но никогда не за этим.

Подход к публикации событий B:

private void handleStatus(final Status status)
{
    disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {

        @Override
        public TwitterStatusReceivedEvent translateTo(
                TwitterStatusReceivedEvent event, long sequence) {
            event.setStatus(status);
            event.setSearchInstruments(searchInstruments);
            return event;
        }
    });
}

В этом сценарии я обнаружил, что ни одно из обработчиков событий вообще не вызывается.

Что я делаю не так?

Обновлять

Вот мой eventhandler в целом. Как мне сигнализировать о том, что обработка завершена?

public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {

    @Override
    public void onEvent(TwitterStatusReceivedEvent event, long sequence,
            boolean endOfBatch) throws Exception {
        String statusText = event.getStatus().getText();
        for (Instrument instrument : event.getSearchInstruments())
        {
            if (statusText.contains(instrument.getSearchTerm()))
            {
                event.setMatchedInstrument(instrument);
                break;
            }
        }
    }

}
Это было полезно?

Решение

Каждый обработчик событий должен запустить в своем собственном потоке, который не выходит, пока вы не отключите разрушитель. Поскольку вы используете одного исполнителя с одним потоком, только первый обработчик событий, который когда -либо выполняется, будет запущен. (Класс Disruptor хранит каждый обработчик в хэшмапе, поэтому, по которому работает работа, будет варьироваться)

Если вы переключитесь на CachedThreadpool, вы должны найти все, что все начнется. Вам не нужно будет делать какое -либо управление номерами последовательностей, потому что это все обрабатывается процессором событий, которые класс Disruptor создает и управляет вам. Просто обрабатывать каждое событие, которое вы получаете, точно правильно.

Другие советы

Вы должны убедиться, что ваш SearchtermatchingHandler обновляет свой номер последовательности после того, как он обрабатывает событие. Eventhandlers дальнейшие вниз по течению (Appendstatushandler, UpdatePricePriceHandler, PersistupdatesHandler) будут осматривать номер последовательности SearchTmatchingHandler, чтобы увидеть, какие события они могут снять с буфера кольца.

У меня была такая же проблема, но это было потому, что я создавал экологически чистый разрыв, используя Spring (конфигурация Java) и создавал исполнителя в том же методе @Bean, что и разрушитель.

Я исправил проблему, создавая исполнителя отдельным методом @Bean.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top