I have a ring buffer in my project, in which a lot of publishers will publish events(for example 500 publishers), and I have 3 EventProcessors which should process the events sequentially. All events should pass this way:
{A lot of Publishers} -> {UpStreamProcessor} -> {DownStreamProcessor} -> {logProcessor}
The problem is that I am losing a lot of time in passing events between publish and starting of UpStreamProcessor, and end of UpStreamProcessor to start of DownStreamProcessor.
For example, When I have 500 publishers, It lasts 1ms in average for processing in UpStreamProcessor and DownStreamProcessor, while it lasts 400ms between UpStreamProcessor finish time to DownStreamProcessor start time.
This is the piece of code for constructing ring buffer and processors:
SequenceBarrier sequenceBarrier;
receiveBuffer = new RingBuffer<>(
MessageContext.FACTORY,
new MultiThreadedLowContentionClaimStrategy(inputBufferSize),
new YieldingWaitStrategy()
);
upStreamAgentProcessor = new BatchEventProcessor<>(
receiveBuffer,
receiveBuffer.newBarrier(),
new UpStreamAgent()
);
sequenceBarrier = receiveBuffer.newBarrier(
upStreamAgentProcessor.getSequence()
);
downStreamAgentProcessor = new BatchEventProcessor<MessageContext>(
receiveBuffer,
sequenceBarrier,
new DownStreamAgent()
);
sequenceBarrier = receiveBuffer.newBarrier(
downStreamAgentProcessor.getSequence()
);
logMapAgentProcessor = new BatchEventProcessor<MessageContext>(
receiveBuffer,
sequenceBarrier,
LogMap.getInstance()
);
receiveBuffer.setGatingSequences(logMapAgentProcessor.getSequence());
operationalExecutor.submit(upStreamAgentProcessor);
operationalExecutor.submit(downStreamAgentProcessor);
operationalExecutor.submit(logMapAgentProcessor);