Прослушиватель подключения потребителя / производителя ActiveMQ
Вопрос
Кажется, я не могу найти способ прослушивания новых подключений производителя и потребителя (или прерываний соединения) в ActiveMQ (версия Java).Я хочу иметь возможность сообщить потребителям (или они сами могут узнать), что связь с производителем оборвалась.Также требуется обратный способ (производитель узнает, что определенный потребитель отключился).
Я был бы признателен за некоторую помощь.
Решение
Я думаю, вы хотите послушать новых производителей и потребителей по определенному направлению (определенной очереди или теме).Это верно?
Вы можете создать экземпляр ConsumerEventSource и ProducerEventSource и зарегистрировать своих собственных слушателей, вызвав для них setConsumerListener и setProducerListener соответственно.
Итак:
Connection conn = yourconnection; // the connection your listener will use
Destination dest = yourdestination; // the destination you're paying attention to
ConsumerEventSource source = new ConsumerEventSource(conn, dest);
source.setConsumerListener(new ConsumerListener() {
public void onConsumerEvent(ConsumerEvent event) {
if (event.isStarted()) {
System.out.println("a new consumer has started - " + event.getConsumerId());
} else {
System.out.println("a consumer has dropped - " + event.getConsumerId());
}
}
});
Если вы посмотрите на код для ConsumerEventSource или ProducerEventSource, вы увидите, что это простые объекты, которые используют методы advisorySupport для прослушивания специальной консультативной темы, в обязанности которой входит трансляция новостей о производителях и потребителях.Вы могли бы узнать больше, прочитав исходный код для этих классов.
Ваше использование термина "подключение" потенциально является проблемой;в ActiveMQ land (который является подмножеством JMS land) "Соединение" - это объект более низкого уровня, который не связан с конкретным назначением.Конкретный клиент создает "Сеанс" из соединения, по-прежнему не относящегося к конкретному получателю, а затем создает QueueSender для конкретного назначения, QueueReceiver, TopicPublisher или TopicSubscriber.Когда они будут созданы или когда сеансы, которые их создали, умрут, это те события, о которых вы хотите услышать, и о которых вы услышите, если будете использовать приведенный выше код.
Другие советы
Вся необходимая мне информация опубликована в консультативных разделах ActiveMQ, таких как "ActiveMQ.Advisory.Подключение" или просто "ActiveMQ.Advisory..>" для всех из них.Даже события, происходящие при подключении к Stomp, публикуются в консультативных разделах ActiveMQ.Следующий код приводит пример этого (протестирован с гибким клиентом, подключенным через Stomp):
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("user", "password", ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(transacted, ackMode);
connection.start();
Destination destinationAdvisory = session.createTopic("ActiveMQ.Advisory..>");
MessageConsumer consumerAdvisory = session.createConsumer(destinationAdvisory);
consumerAdvisory.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof ActiveMQMessage) {
ActiveMQMessage activeMessage = (ActiveMQMessage) message;
Object command = activeMessage.getDataStructure();
if (command instanceof ConsumerInfo) {
System.out.println("A consumer subscribed to a topic or queue: " + command);
} else if (command instanceof RemoveInfo) {
RemoveInfo removeInfo = (RemoveInfo) command;
if (removeInfo.isConsumerRemove()) {
System.out.println("A consumer unsubscribed from a topic or queue");
}
else {
System.out.println("RemoveInfo, a connection was closed: " + command);
}
} else if (command instanceof ConnectionInfo) {
System.out.println("ConnectionInfo, a new connection was made: " + command);
} else {
System.out.println("Unknown command: " + command);
}
}
}
});