Регулирование пропускной способности с помощью OMATPE
Вопрос
Мой веб-сервер (специально созданный поверх Netty) использует веб-клиент (также специально созданный с помощью Netty) для отправки прокси-запросов к S3.
Client -> Webserver|Webclient -> S3
Цель системы - передавать загружаемые файлы непосредственно в S3 с небольшой логикой:
Webserver
принимает запрос клиента (POST);- Наборы
Client
переводит читаемость канала в значение false и проверяет кучу всего; - Когда все успешно проверено, он использует
Webclient
для подключения кS3
; - Когда
Webclient
подключается кS3
:- он отправляет клиенту сообщение о 100-продолжении обратно
- он устанавливает
Client
читаемость канала равна true
- С этого момента все фрагменты, полученные
Webserver
передаются вWebclient
для переадресации.
В том (крайне маловероятном) случае, если связь между Client
и Webserver
это быстрее, чем соединение между Webclient
и S3
, мне нужно ограничить связь между Client
и Webserver
.
Подход, который я использовал, состоял в том, чтобы просто сохранить счетчик байтов, полученных Webserver
(который увеличивается каждый раз Client
отправляет данные), и это уменьшается каждый раз, когда выполняется запись Webclient
завершает.Всякий раз, когда объем данных в этом буфере превышает заданное пороговое значение, Client
читаемость канала установлена на false
.
Это отлично работает до тех пор, пока я не добавлю OrderedMemoryAwareThreadPoolExecutor
к конвейеру сервера.
Простое решение заключается в использовании OioClientSocketChannelFactory
на Webclient
.Это приводит к вызовам Channel.write
быть блокирующим, поэтому, когда messageReceived()
вызывается на Webserver
обработчик — и, следовательно, Channel.write
вызывается на Webclient
— регулирование происходит "естественным путем".
Однако, если я использую NioClientSocketChannelFactory
на Webclient
, затем обращается к Channel.write
становится асинхронным, и регулирование перестает работать.
В основном, что я здесь замечаю, так это то, что Channel.setReadability(false)
кажется, не оказывает никакого эффекта, когда OrderedMemoryAwareThreadPoolExecutor
вставляется в трубопровод.
Как я могу выполнить регулирование с помощью OMATPE в конвейере?
Решение
1) OrderedMemoryAwareThreadPoolExecutor также отслеживает память канала (в вашем случае размер полученных данных) и приостанавливает / разрешает чтение, когда оно превышает / ниже настроенного максимального размера (через конструктор OrderedMemoryAwareThreadPoolExecutor).
2) Когда он используется с ExecutionHandler, обработчик может отбрасывать события состояния канала, если какое-либо вложение найдено в контексте (но это вложение контекста обычно устанавливается OrderedMemoryAwareThreadPoolExecutor, чтобы не позволять вышестоящим обработчикам изменять состояние канала и вызывать исключение OutOfMemoryException ).
boolean readSuspended = ctx.getAttachment() != null;
if (readSuspended) {
// Drop the request silently if MemoryAwareThreadPool has
// set the flag.
e.getFuture().setSuccess();
return;
}
Я думаю, вам нужно настроить минимальный и максимальный размер памяти канала OMATPE, или у вас может быть контекстное вложение, приводящее к такой ситуации?
Другие советы
Как сказал Джестан, пожалуйста, обратитесь к org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler
в мастере.Кроме того, вам нужно будет настроить свой receiveBufferSizePredictorFactory
чтобы он не возвращал слишком большое значение.В противном случае Netty просто выделит большой буфер и заполнит его очень быстро.Использование AdaptiveReceiveBufferSizePredictorFactory
с меньшим максимумом в сочетании с ChannelTrafficShapingHandler
.