Disruptor -EventHandlersは呼び出されませんでした
-
25-10-2019 - |
質問
私は一緒に遊んでいます 破壊者 フレームワーク、そして私のイベントハンドラーが呼び出されていないことを発見しています。
これが私のセットアップコードです:
private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
private void initializeDisruptor() {
if (disruptor != null)
return;
disruptor =
new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
new SingleThreadedClaimStrategy(BUFFER_SIZE),
new SleepingWaitStrategy());
disruptor.handleEventsWith(searchTermMatchingHandler)
.then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);
this.ringBuffer = disruptor.start();
}
他の場所では、イベントを公開しています。次の2つのアプローチのそれぞれを試しました。
イベントパブリッシングアプローチa:
private void handleStatus(final Status status)
{
long sequence = ringBuffer.next();
TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
event.setStatus(status);
event.setSearchInstruments(searchInstruments);
ringBuffer.publish(sequence);
}
このシナリオでは、最初のシナリオを見つけます EventHandler
呼び出されますが、それ以上のものはありません。
イベントパブリッシングアプローチB:
private void handleStatus(final Status status)
{
disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {
@Override
public TwitterStatusReceivedEvent translateTo(
TwitterStatusReceivedEvent event, long sequence) {
event.setStatus(status);
event.setSearchInstruments(searchInstruments);
return event;
}
});
}
このシナリオでは、イベントハンドラーのどれもまったく呼び出されないことがわかりました。
私は何が間違っているのですか?
アップデート
これが私のイベントハンドラー全体です。処理が完了していることをどのように知らせてください。
public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {
@Override
public void onEvent(TwitterStatusReceivedEvent event, long sequence,
boolean endOfBatch) throws Exception {
String statusText = event.getStatus().getText();
for (Instrument instrument : event.getSearchInstruments())
{
if (statusText.contains(instrument.getSearchTerm()))
{
event.setMatchedInstrument(instrument);
break;
}
}
}
}
解決
各イベントハンドラーは、破壊者をシャットダウンするまで終了しない独自のスレッドで実行する必要があります。単一のスレッドエグゼキューターを使用しているため、実行された最初のイベントハンドラーのみが実行されます。 (破壊者クラスは各ハンドラーをハッシュマップに保存するため、どのハンドラーが走り続けるかは異なります)
cachedthreadpoolに切り替えた場合、すべてが実行を開始することがわかります。 Secance番号の管理を行う必要はありません。これはすべて、Disruptorクラスがセットアップして管理するEventProcessorによってすべて処理されているためです。取得する各イベントを処理するだけで正確です。
他のヒント
SearchTermMatchingHandlerがイベントを処理した後、シーケンス番号を更新していることを確認する必要があります。さらに下流のEventHandlers(Appendstatushandler、updatePriceHandler、stasteUpdateshandler)は、SearchMatchinghandlerシーケンス番号を検査して、リングバッファーを拾うことができるイベントを確認します。
私は同じ問題を抱えていましたが、それは私がSpring(Java Config)を使用してDisruptorをインスタンス化していて、Discruptorと同じ@Beanメソッドのエグゼクターをインスタンス化していたからです。
別の@Beanメソッドでエグゼキューターをインスタンス化することにより、問題を修正しました。