Всегда следите за тем, чтобы в теме ActiveMQ были только последние 10 сообщений

StackOverflow https://stackoverflow.com/questions/2220811

  •  19-09-2019
  •  | 
  •  

Вопрос

У нас есть проблема в ActiveMQ, из-за которой у нас огромное количество сообщений, не переходящих по темам.Темы настроены на непостоянные, недолговечные.Наш Activemq.xml файл является

<beans>

  <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false" persistent="false">

<!--
    <persistenceAdapter>
      <journaledJDBC journalLogFiles="5" dataDirectory="../data"/>
    </persistenceAdapter>
-->

        <transportConnectors>
            <transportConnector uri="vm://localhost"/>
        </transportConnectors>

  </broker>

</beans>

и наше определение темы в messaging-config.xml является

<destination id="traceChannel">

    <properties>

        <network>
        <session-timeout>10</session-timeout>
    </network>

        <server>
            <message-time-to-live>10000</message-time-to-live>
            <durable>false</durable>
            <durable-store-manager>flex.messaging.durability.FileStoreManager</durable-store-manager>
        </server>

        <jms>
            <destination-type>Topic</destination-type>
            <message-type>javax.jms.ObjectMessage</message-type>
            <connection-factory>ConnectionFactory</connection-factory>
            <destination-jndi-name>dynamicTopics/traceTopic</destination-jndi-name>
            <delivery-mode>NON_PERSISTENT</delivery-mode>
            <message-priority>DEFAULT_PRIORITY</message-priority>
            <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
            <transacted-sessions>false</transacted-sessions>
            <initial-context-environment>
                <property>
                    <name>Context.INITIAL_CONTEXT_FACTORY</name>
                    <value>org.apache.activemq.jndi.ActiveMQInitialContextFactory</value>
                </property>
                <property>
                    <name>Context.PROVIDER_URL</name>
                    <value>tcp://localhost:61616</value>
                </property>
            </initial-context-environment>
        </jms>
    </properties>

    <channels>
        <channel ref="rtmps" />
    </channels>

    <adapter ref="trace" />

</destination>

Чего я пытаюсь добиться, так это того, чтобы только последние 10 сообщений были посвящены темам в любой момент времени, поскольку если оставить его запущенным на ночь, то получится более 150 тысяч сообщений по теме, хотя оно должно содержать только очень небольшое количество.

Это было полезно?

Решение

Насколько я знаю, сообщения, отправленные в недолговечную тему без подписчиков, должны быть удалены.Копию сообщения получат только зарегистрированные в данный момент пользователи.

Как вы проверяете, что тема содержит эти 150 тысяч сообщений?Через JMX?

Независимо от того факта, что ваша недолговечная тема не должна кэшировать эти 150 тысяч сообщений, вы можете ограничить количество сообщений, хранимых для каждого пользователя, используя политику брокера:

<broker>
...    
  <pendingMessageLimitStrategy>
    <constantPendingMessageLimitStrategy limit="10"/>
  </pendingMessageLimitStrategy>
...
</broker>

Другие советы

Я не уверен, что то, что вы хотите, можно заархивировать.Вы можете сделать несколько вещей:

Прежде всего, вы можете указать время для своих сообщений:

public ITopicPublisher CreateTopicPublisher(string selector)
        {
            try
            {
                IMessageProducer producer = m_session.CreateProducer(m_topic);
                ActiveMQPublisher publisher = new ActiveMQPublisher(producer);

                // here we put a time to live to 1min for eg
                TimeSpan messageTTL = TimeSpan.FromMilliseconds(60000);
                publisher.TimeToLive = messageTTL;

                if (!String.IsNullOrEmpty(selector))
                {
                    publisher.IsSelector = true;
                    publisher.Selector = selector;
                }
                return publisher;
            }
            catch (Exception ex)
            {
                Logger.Exception(ex);
                throw;
            }
        }

Вы по-прежнему будете получать 150 тысяч сообщений утром, но как только один пользователь подключится к вашей теме, просроченные сообщения исчезнут.

Возможно, вы также захотите ознакомиться со стратегией выселения (Стратегия выселения)

Редактировать:Я только что понял одну вещь.Вы говорите: "если оставить его запущенным на ночь, это приведет к появлению более 150 тысяч сообщений по теме, хотя в нем должно содержаться только очень небольшое количество". В теме у вас нет понятия о сообщениях, которые нужно перехватывать.Если никто не подписан на тему, отправленное туда сообщение удаляется.Тем не менее, "Сообщение поставлено в очередь" увеличится на единицу.Если вы хотите отправить только "последние 10 сообщений", может быть, вам следует использовать очередь вместо темы?

Не знаю, работает ли это, у меня еще не было возможности протестировать это, но, пожалуйста, взгляните на constantPendingMessageLimitStrategy здесь http://activemq.apache.org/slow-consumer-handling.html

Удачи Клаудио

Пара вещей, не очень понятных из этого поста

  • Основываясь на моем разборе и попытках добавить это..для этого нужно немного больше контекста, поэтому я добавил его ниже для тех, кто, возможно, не захочет проходить через ActiveMQ Google struggle..
  • Это то, что есть в моем activemq.xml и я вообще не вижу этой работы, поэтому я надеялся, что смогу получить некоторые идеи от других, которые могли бы указать мне правильное направление.

    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic=">"
                       topicPrefetch="10"/>
          <policyEntry topic=">"
                       producerFlowControl="false"/>
          <policyEntry topic=".>"
                       >
            <messageEvictionStrategy>
              <oldestMessageEvictionStrategy/>
            </messageEvictionStrategy>
    
            <pendingMessageLimitStrategy>
              <constantPendingMessageLimitStrategy limit="10"/>
            </pendingMessageLimitStrategy>
          </policyEntry>
        </policyEntries>
      </policyMap>
    </destinationPolicy>
    
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top