Question

My web server (custom built on top of Netty) uses a web client (also custom built with Netty) to make proxied requests to S3.

Client -> Webserver|Webclient -> S3

The purpose of the system is to pipe file uploads directly to S3 with a little bit of logic:

  • Webserver accepts client request (POST);
  • Sets Client channel readability to false and verifies a bunch of stuff;
  • When everything is successfully verified, it uses the Webclient to connect to S3;
  • When the Webclient connects to S3:
    1. it sends a 100-Continue back to the client
    2. it sets Client channel readability to true
  • From there on, all chunks received by the Webserver are handed over to the Webclient to forward.

In the (highly unlikely) event that the connection between Client and Webserver is faster than the connection between Webclient and S3, I need to throttle the connection between Client and Webserver.

The approach I took was simply keep a counter of bytes received by the Webserver (which increments every time Client sends data) and that decrements every time that a write of Webclient completes. Whenever the amount of data on this buffer goes over a given threshold, the Client's channel readability is set to false.

This works great until I add an OrderedMemoryAwareThreadPoolExecutor to the server's pipeline.

A simple solution is to use an OioClientSocketChannelFactory on the Webclient. This causes the calls to Channel.write to be blocking, so when messageReceived() is called on the Webserver's handler — and, consequently Channel.write is called on the Webclient — throttling happens "naturally".

However, if I use a NioClientSocketChannelFactory on the Webclient, then calls to Channel.write become asynchronous and throttling stops working.

Basically what I'm noticing here is that Channel.setReadability(false) seems to bear no effect when an OrderedMemoryAwareThreadPoolExecutor is inserted into the pipeline.

How can I perform throttling using OMATPE in the pipeline?

Was it helpful?

Solution

1) OrderedMemoryAwareThreadPoolExecutor also monitors the channel memory (in your case received data size) and suspend/enable reading when it above/below the configured max size (through OrderedMemoryAwareThreadPoolExecutor constructor).

2) When its used with an ExecutionHandler, the handler may discards channel state events, if some attachment found in the context (But that context attachment is usually set by OrderedMemoryAwareThreadPoolExecutor, to not allow above upstream handlers to change the channel state and cause OutofMemoryException ).

            boolean readSuspended = ctx.getAttachment() != null;
            if (readSuspended) {
                // Drop the request silently if MemoryAwareThreadPool has
                // set the flag.
                e.getFuture().setSuccess();
                return;
            }

I think, you have to configure min, max channel memory size of OMATPE, or you may have an context attachment leads to this situation?

OTHER TIPS

As Jestan said, please refer to org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler in the master. Also, you will have to configure your receiveBufferSizePredictorFactory so that it does not return too large value. Otherwise, Netty will just allocate a large buffer and fill it really quickly. Use AdaptiveReceiveBufferSizePredictorFactory with a smaller maximum in combination with ChannelTrafficShapingHandler.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top