Question

I'm playing around with the Disruptor framework, and am finding that my event handlers are not being invoked.

Here's my setup code:

private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService  EXECUTOR = Executors.newSingleThreadExecutor();

private void initializeDisruptor() {
    if (disruptor != null)
        return;

    disruptor = 
            new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
                    new SingleThreadedClaimStrategy(BUFFER_SIZE),
                    new SleepingWaitStrategy());
    disruptor.handleEventsWith(searchTermMatchingHandler)
        .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);

    this.ringBuffer = disruptor.start();
}

Elsewhere, I publish events. I've tried each of the following two approaches:

Event Publishing Approach A:

private void handleStatus(final Status status)
{

    long sequence = ringBuffer.next();
    TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
    event.setStatus(status);
    event.setSearchInstruments(searchInstruments);
    ringBuffer.publish(sequence);
}

In this scenario, I find the the first EventHandler gets invoked, but never anything beyond that.

Event Publishing Approach B:

private void handleStatus(final Status status)
{
    disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {

        @Override
        public TwitterStatusReceivedEvent translateTo(
                TwitterStatusReceivedEvent event, long sequence) {
            event.setStatus(status);
            event.setSearchInstruments(searchInstruments);
            return event;
        }
    });
}

In this scenario, I find that none of the event handlers get invoked at all.

What am I doing wrong?

Update

Here's my EventHandler in it's entirety. How should I be signalling that processing is complete?

public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {

    @Override
    public void onEvent(TwitterStatusReceivedEvent event, long sequence,
            boolean endOfBatch) throws Exception {
        String statusText = event.getStatus().getText();
        for (Instrument instrument : event.getSearchInstruments())
        {
            if (statusText.contains(instrument.getSearchTerm()))
            {
                event.setMatchedInstrument(instrument);
                break;
            }
        }
    }

}
Was it helpful?

Solution

Each event handler needs to run in its own thread which wont exit until you shutdown the disruptor. Since you're using a single threaded executor, only the first event handler that happens to execute will ever run. (The Disruptor class stores each handler in a hashmap so which handler winds up running will vary)

If you switch to a cachedThreadPool you should find it all starts running. You won't need to do any management of the sequence numbers because that's all handled by the EventProcessor that the Disruptor class sets up and manages for you. Just processing each event you get is exactly right.

OTHER TIPS

You need to make sure your searchTermMatchingHandler is updating its sequence number after it processes the event. The EventHandlers further downstream (appendStatusHandler, updatePriceHandler, persistUpdatesHandler) will be inspecting the searchTermMatchingHandler sequence number to see which events they can pick up off the ring buffer.

I had the same problem, but it was because I was instantiating the Disruptor using Spring (Java config) and was instantiating the Executor in the same @Bean method as the Disruptor.

I fixed the problem by instantiating the Executor in a separate @Bean method.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top