我正在玩 破坏者 框架,发现我的活动处理程序没有被调用。

这是我的设置代码:

private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService  EXECUTOR = Executors.newSingleThreadExecutor();

private void initializeDisruptor() {
    if (disruptor != null)
        return;

    disruptor = 
            new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
                    new SingleThreadedClaimStrategy(BUFFER_SIZE),
                    new SleepingWaitStrategy());
    disruptor.handleEventsWith(searchTermMatchingHandler)
        .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);

    this.ringBuffer = disruptor.start();
}

在其他地方,我发布了事件。我尝试了以下两种方法:

事件发布方法

private void handleStatus(final Status status)
{

    long sequence = ringBuffer.next();
    TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
    event.setStatus(status);
    event.setSearchInstruments(searchInstruments);
    ringBuffer.publish(sequence);
}

在这种情况下,我找到了第一个 EventHandler 被调用,但从来没有什么。

事件发布方法B:

private void handleStatus(final Status status)
{
    disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {

        @Override
        public TwitterStatusReceivedEvent translateTo(
                TwitterStatusReceivedEvent event, long sequence) {
            event.setStatus(status);
            event.setSearchInstruments(searchInstruments);
            return event;
        }
    });
}

在这种情况下,我发现任何活动处理程序都根本没有被调用。

我究竟做错了什么?

更新

这是我的eventhandler。我应该如何表明处理已完成?

public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {

    @Override
    public void onEvent(TwitterStatusReceivedEvent event, long sequence,
            boolean endOfBatch) throws Exception {
        String statusText = event.getStatus().getText();
        for (Instrument instrument : event.getSearchInstruments())
        {
            if (statusText.contains(instrument.getSearchTerm()))
            {
                event.setMatchedInstrument(instrument);
                break;
            }
        }
    }

}
有帮助吗?

解决方案

每个事件处理程序都需要在自己的线程中运行,该线程不会退出,直到关闭破坏者为止。由于您使用的是单个螺纹执行程序,因此只有执行的第一个事件处理程序才能运行。 (Disruptor类将每个处理程序存储在一个标签中

如果您切换到cachedthreadpool,则应该发现它全部开始运行。您不需要对序列编号进行任何管理,因为这一切都是由Disruptor类设置并为您管理的事件处理器所处理的。仅处理您获得的每个事件都是完全正确的。

其他提示

您需要确保您的搜索termmatchinghandler在处理事件后正在更新其序列号。 EventHandlers进一步下游(AppendStatushandler,UpdatePriceHandler,PersistupDatesHandler)将检查searchTermMatchingHandler序列编号,以查看他们可以从环形缓冲区中拾取哪些事件。

我遇到了同样的问题,但这是因为我使用spring(Java Config)实例化了破坏者,并以与Disruptor相同的@Bean方法实例化执行器。

我通过单独的@Bean方法实例化执行程序来解决问题。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top