Прослушиватель подключения потребителя / производителя ActiveMQ

StackOverflow https://stackoverflow.com/questions/1702755

  •  19-09-2019
  •  | 
  •  

Вопрос

Кажется, я не могу найти способ прослушивания новых подключений производителя и потребителя (или прерываний соединения) в ActiveMQ (версия Java).Я хочу иметь возможность сообщить потребителям (или они сами могут узнать), что связь с производителем оборвалась.Также требуется обратный способ (производитель узнает, что определенный потребитель отключился).

Я был бы признателен за некоторую помощь.

Это было полезно?

Решение

Я думаю, вы хотите послушать новых производителей и потребителей по определенному направлению (определенной очереди или теме).Это верно?

Вы можете создать экземпляр ConsumerEventSource и ProducerEventSource и зарегистрировать своих собственных слушателей, вызвав для них setConsumerListener и setProducerListener соответственно.

Итак:

Connection conn = yourconnection; // the connection your listener will use
Destination dest = yourdestination; // the destination you're paying attention to
ConsumerEventSource source = new ConsumerEventSource(conn, dest);
source.setConsumerListener(new ConsumerListener() {

   public void onConsumerEvent(ConsumerEvent event) {
      if (event.isStarted()) {
         System.out.println("a new consumer has started - " + event.getConsumerId());
      } else {
         System.out.println("a consumer has dropped - " + event.getConsumerId());
      }
   }

});

Если вы посмотрите на код для ConsumerEventSource или ProducerEventSource, вы увидите, что это простые объекты, которые используют методы advisorySupport для прослушивания специальной консультативной темы, в обязанности которой входит трансляция новостей о производителях и потребителях.Вы могли бы узнать больше, прочитав исходный код для этих классов.

Ваше использование термина "подключение" потенциально является проблемой;в ActiveMQ land (который является подмножеством JMS land) "Соединение" - это объект более низкого уровня, который не связан с конкретным назначением.Конкретный клиент создает "Сеанс" из соединения, по-прежнему не относящегося к конкретному получателю, а затем создает QueueSender для конкретного назначения, QueueReceiver, TopicPublisher или TopicSubscriber.Когда они будут созданы или когда сеансы, которые их создали, умрут, это те события, о которых вы хотите услышать, и о которых вы услышите, если будете использовать приведенный выше код.

Другие советы

Вся необходимая мне информация опубликована в консультативных разделах ActiveMQ, таких как "ActiveMQ.Advisory.Подключение" или просто "ActiveMQ.Advisory..>" для всех из них.Даже события, происходящие при подключении к Stomp, публикуются в консультативных разделах ActiveMQ.Следующий код приводит пример этого (протестирован с гибким клиентом, подключенным через Stomp):

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("user", "password", ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(transacted, ackMode);
connection.start();
Destination destinationAdvisory = session.createTopic("ActiveMQ.Advisory..>");
MessageConsumer consumerAdvisory = session.createConsumer(destinationAdvisory);
consumerAdvisory.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {

            if (message instanceof ActiveMQMessage) {
                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
                Object command = activeMessage.getDataStructure();
                if (command instanceof ConsumerInfo) {
                    System.out.println("A consumer subscribed to a topic or queue: " + command);
                } else if (command instanceof RemoveInfo) {
                    RemoveInfo removeInfo = (RemoveInfo) command;
                    if (removeInfo.isConsumerRemove()) {
                        System.out.println("A consumer unsubscribed from a topic or queue");
                    }
                    else {
                        System.out.println("RemoveInfo, a connection was closed: " + command);
                    }
                } else if (command instanceof ConnectionInfo) {
                    System.out.println("ConnectionInfo, a new connection was made: " + command);
                } else {
                    System.out.println("Unknown command: " + command);
                }
            }
    }
});
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top