активизация с помощью Stomp и ActiveMq.prefetchSize = 1
-
24-10-2019 - |
Вопрос
У меня есть ситуация, когда у меня есть один брокер ActiveMQ с 2 очередями, Q1 и Q2. У меня есть два потребителя на основе Ruby, использующих ActiveMessaging. Назовем их C1 и C2. Оба потребителя подписываются на каждую очередь. Я устанавливаю ActiveMq.prefetchSize = 1 при подписке на каждую очередь. Я также устанавливаю ACK = клиент.
Рассмотрим следующую последовательность событий:
1) Сообщение, которое запускает долгосрочную работу, публикуется в очереди Q1. Назовите это M1.
2) M1 отправляется на потребитель C1, что запускает долгую работу.
3) Два сообщения, которые запускают короткие задания, опубликованы в очередь Q2. Назовите эти M2 и M3.
4) M2 отправляется на C2, который быстро запускает короткую работу.
5) M3 отправляется на C1, хотя C1 все еще работает M1. Он способен отправлять на C1, потому что PrefetchSize = 1 установлен на подписке на очередь, а не на соединении. Таким образом, тот факт, что сообщение Q1 уже было отправлено, не останавливает одно сообщение Q2.
Поскольку потребители ActiveMessaging являются однопоточными, чистый результат заключается в том, что M3 сидит и ждет на C1 в течение долгого времени, пока C1 не завершит обработку M1. Таким образом, M3 не обрабатывается в течение длительного времени, несмотря на то, что потребитель C2 сидит на холостом ходу (поскольку он быстро заканчивается с сообщением M2).
По сути, всякий раз, когда выполняется длинная задача Q1, а затем создается целая куча коротких заданий Q2, именно одна из коротких заданий Q2 застряла на потребителю, ожидающем долгой задачи Q1.
Есть ли способ установить prefetchSize на уровне соединения, а не на уровне подписки? Я действительно не хочу, чтобы какие -либо сообщения были отправлены на C1, когда он обрабатывает M1. Другая альтернатива заключается в том, что я мог бы создать потребителя, посвященного обработке Q1, а затем попросить других потребителей, посвященных обработке Q2. Но я бы предпочел не делать этого, так как сообщения Q1 нечасты-выделенные потребители Q1 будут сидеть на холостом ходу большую часть дня, связывая память.
Решение
ActiveMq.prefetchSize доступен только на сообщении подписки, а не в подключении, согласно документам ActiveMQ для их расширенных заголовков Stomp (http://activemq.apache.org/stomp.html) Вот соответствующая информация:
Глагол: подписаться
Заголовок: ActiveMq.prefetchSize
Тип: инт
Описание: указывает максимальное количество ожидающих сообщений, которые будут отправлены клиенту. Как только этот максимум не достигнут больше сообщений, до тех пор, пока клиент не признает сообщение. Установите 1 для очень справедливого распределения сообщений по потребителям, где обработка сообщений может быть медленной.
Мое чтение и опыт работы в этом заключается в том, что, поскольку M1 не был Ack'd (B/C у вас включен клиент, который включен), что этот M1 должен быть 1 сообщением, разрешенным PrefetchSize = 1, установленное на подписке. Я удивлен, узнав, что это не сработало, но, возможно, мне нужно провести более подробный тест. Ваши настройки должны быть правильными для поведения, которое вы хотите.
Я слышал о том, что у других, о диспетчере ActiveMq, я слышал о том, что это ошибка с использованием вами версией.
Одно из предложений, которое у меня было бы, состоит в том, чтобы либо понюхать сетевой трафик, чтобы увидеть, по какой -то причине получает M1, либо бросить высказывания в драгоценный камень Ruby Stomp, чтобы наблюдать за общением (это то, что я обычно делаю, когда отладка проблем с топком).
Если у меня будет возможность попробовать это, я обновлю свой комментарий своими собственными результатами.
Одно из предложений: вполне возможно, что могут быть отправлены несколько длинных сообщений о обработке, и если количество длинных сообщений о обработке превышает ваше количество процессов, вы находитесь в этом исправлении, где ожидаются быстрого обработки сообщений.
У меня, как правило, есть хотя бы один выделенный процесс, который просто выполняет быстрые задания, или, по -другому, посвятил бы множество процессов, которые просто выполняют более длительные задания. Наличие всех потребительских процессов на Poller прислушивается как к длинным, так и коротким, может в конечном итоге получить неоптимальные результаты, независимо от того, что делает диспетчер. Группы процессов - это способ настроить потребителя для прослушивания подмножества направлений: http://code.google.com/p/activemessaging/wiki/configuration
Имя процессора_GROUP, *list_of_processors
A processor group is a way to run the poller to only execute a subset of
Процессоры, передавая имя группы в аргументах командной строки Poller.
You specify the name of the processor as its underscored lowercase
версия. Так что, если у вас есть FoobarProcessor и Barfooprocessor в группе процессоров, это будет выглядеть так:
ActiveMessaging::Gateway.define do |s| ... s.processor_group :my_group, :foo_bar_processor, :bar_foo_processor end The processor group is passed into the poller like the following: ./script/poller start -- process-group=my_group
Другие советы
Я не уверен, что ActiveMessage поддерживает это, но вы могли бы отказаться от других своих потребителей, когда появится длинное обработанное сообщение, а затем повторно подписать их после того, как оно будет обработано.
Это должно дать вам желаемый эффект.