質問
私のWebサーバー(Nettyの上にカスタムビルド)は、Webクライアント(Nettyでカスタムビルドされている)を使用して、S3にプロキシリクエストを行います。
Client -> Webserver|Webclient -> S3
システムの目的は、少しのロジックを使用して、ファイルアップロードをS3に直接パイプすることです。
Webserver
クライアントリクエストを受け入れる(投稿);- セット
Client
虚偽のチャネルの読みやすさと多くのものを検証します。 - すべてが正常に検証された場合、それはを使用します
Webclient
に接続しますS3
; - いつ
Webclient
に接続しますS3
:- クライアントに100コントンを送り返します
- 設定します
Client
trueへのチャネルの読みやすさ
- そこから、すべてのチャンクが受け取った
Webserver
に引き渡されますWebclient
前に。
の間の接続が Client
と Webserver
間の接続よりも高速です Webclient
と S3
, 、間の接続をスロットルする必要があります Client
と Webserver
.
私が取ったアプローチは、単に受け取ったバイトのカウンターを保持することでした Webserver
(毎回増分します Client
データを送信します)とそれは、の書き込みが毎回減少します Webclient
完了します。このバッファー上のデータの量が特定のしきい値を越えているときはいつでも、 Client
チャネルの読みやすさはに設定されています false
.
これは、私が追加するまでうまく機能します OrderedMemoryAwareThreadPoolExecutor
サーバーのパイプラインに。
簡単な解決策は、ANを使用することです OioClientSocketChannelFactory
に Webclient
. 。これにより、呼び出しが発生します Channel.write
ブロックするために、いつ messageReceived()
に呼ばれます Webserver
ハンドラー - そしてその結果 Channel.write
に呼ばれます Webclient
- スロットリングは「自然に」起こります。
ただし、aを使用する場合 NioClientSocketChannelFactory
に Webclient
, 、次に電話します Channel.write
非同期になり、スロットルが動作します。
基本的に私がここで気づいているのはそれです Channel.setReadability(false)
anが効果をもたらさないようです OrderedMemoryAwareThreadPoolExecutor
パイプラインに挿入されます。
パイプラインでOMATPEを使用してスロットリングを実行するにはどうすればよいですか?
解決
1)OrderedMemoryAwarEthReadPoolexecutorは、構成された最大サイズ(OrderedMemoryAwarethReadPoolexecutor Constructorを介して上記/以下の場合、チャネルメモリ(場合はデータサイズ)を監視し、読み取り/有効化を停止/有効にします。
2)実行ハンドラーで使用された場合、ハンドラーは、コンテキストで見つかったいくつかの添付ファイル(ただし、そのコンテキストの添付ファイルが通常、上流のハンドラーがチャネル状態を変更し、OutMemoryExceptionを変更できるようにすることを許可しないように、そのコンテキストの添付ファイルが通常設定されている場合に設定されます) 。
boolean readSuspended = ctx.getAttachment() != null;
if (readSuspended) {
// Drop the request silently if MemoryAwareThreadPool has
// set the flag.
e.getFuture().setSuccess();
return;
}
min、maxチャネルメモリサイズのmin、またはこの状況につながるコンテキストの添付ファイルがあるかもしれないと構成する必要があると思いますか?
他のヒント
ジェスタンが言ったように、参照してください org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler
マスターで。また、構成する必要があります receiveBufferSizePredictorFactory
それがあまりにも大きな値を返さないように。それ以外の場合、Nettyは大きなバッファーを割り当ててすぐに埋めるだけです。使用する AdaptiveReceiveBufferSizePredictorFactory
と組み合わせた最大値が小さくなります ChannelTrafficShapingHandler
.