문제

I want my app to react on UDP events sent from thousands of different clients. Each client sends 1-10 UDP packets every 5-10s. Each packet will and should be processed very quickly (mainly in memory and small calculations, helped with redis, only occasional DB calls). There will be no returning of data to the callers.

I implemented Reactor in Spring like they described on the wiki. Then I implemented UDP inbound channel like described on their Spring Integration docs. Here is the config :

<int-ip:udp-inbound-channel-adapter id="receiverChannel"
                                    channel="stringConvert"
                                    port="9000"
                                    multicast="false"
                                    check-length="false"
                                    pool-size="10"
                                    lookup-host="false"
        />

<int:transformer id="convertChannel"
                 input-channel="stringConvert"
                 output-channel="toProcess"
                 ref="transformer"
                 method="transform"

        />

<int:service-activator input-channel="toProcess"
                       ref="accumulator"
                       method="accumulate"/>

<bean id="accumulator" class="hello.UDPAccumulator" />
<bean id="transformer" class="hello.UDPTransformer" />

And then in UDPAccumulator I publish that message to reactor:

@Service
public class UDPAccumulator {

@Autowired
ReactorProducer producer;

public void accumulate(String quote) {
    producer.fireEvent(quote);

}

}

Is this the "right" way to do this, concerning that I want high throughoutput? What is the inner working of the int-ip:udp-inbound-channel-adapter and can it be the bottle neck before it passes messages to the reactor? I see that reactor have some TCP related classes and support but no UDP. Any suggestion on how to do this the best way is appreciated!

Bonus question. What if messages arrive faster than they are dispatched to reactor? Will redis message store (bottom of the article) help? And what if my method that crunches this packets in reactor is slow?

도움이 되었습니까?

해결책

Since we don't have direct UDP support in Reactor yet, your abstractions to publish events into Reactor are very sensible. But you do note in your "bonus question" that there are issues with publisher/consumer throughput that have to be managed in some domain-specific way; there's no silver bullet there.

In your use-case, I'd actually be tempted to say the Processor [1] might be a better fit. It provides much higher overall throughput for data processing because it circumvents the dynamic Selector-based dispatching that happens in a plain Reactor. Unless you're dispatching the incoming events to different handlers based on some topic criteria, I'd suggest you look at that instead. With higher throughput, you'll have to worry a little less about Consumers keeping up (unless your Consumer is doing something really slow, which nothing can automagically speed up).

But if you really, really need to manage the backlog, I'd suggest decoupling your producers and consumers via a Queue. Reactor has a PersistentQueue [2] abstraction you can use to publish objects into and persist to disk using JavaChronicle [3], which can then be drained into a Consumer using a Poller (javadoc is coming on Poller sometime this week as we get ready for 1.0...it was previously called Pipe [4]).

다른 팁

I can't speak to Reactor, but the UDP adapter has a dedicated thread that reads raw packets and hands them over to a TaskExecutor. It does this as soon as possible so it can get back to reading the next packet.

The default TaskExecutor is a fixed thread pool.

Reactor has a DispatcherTaskExecutor that can be injected into the adapter.

The same task executor is used for the main reader thread and the handoffs.

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top