Динамическое создание асинхронных очередей сообщений в Java

StackOverflow https://stackoverflow.com/questions/1642122

Вопрос

Мне нужно динамически создавать асинхронные очереди сообщений в Java. Мой пример использования - отправка электронной почты через несколько SMTP-серверов: мне нужно обеспечить, чтобы электронные письма на один и тот же SMTP-сервер обрабатывались последовательно, но электронные письма на разные SMTP-серверы могут обрабатываться одновременно. Я использовал JMS в прошлом, но, насколько я вижу, он позволяет создавать только очереди во время компиляции, тогда как мне нужно создавать очереди во время выполнения (одна очередь для каждого SMTP-сервера).

Я что-то упускаю из-за JMS или есть какой-то другой инструмент / предложение, на которое мне стоит взглянуть?

Это было полезно?

Решение

Я согласен с Адамом, вариант использования звучит как JMS. Достаточно встроенной функциональности Java:

package de.mhaller;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

import org.junit.Assert;
import org.junit.Test;

public class Mailer {

    @Test
    public void testMailer() throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        ArrayList<Mail> log = new ArrayList<Mail>();
        LinkedBlockingDeque<Mail> incoming = new LinkedBlockingDeque<Mail>();

        // TODO: Put mails to be sent into the incoming queue
        incoming.offer(new Mail("foo1@localhost", "localhost"));
        incoming.offer(new Mail("foo2@otherhost", "otherhost"));
        incoming.offer(new Mail("foo3@otherhost", "otherhost"));
        incoming.offer(new Mail("foo4@localhost", "localhost"));

        Map<Mailserver, Queue<Mail>> queues = new HashMap<Mailserver, Queue<Mail>>();
        while (!incoming.isEmpty()) {
            Mail mail = incoming.pollFirst();
            Mailserver mailserver = findMailserver(mail);
            if (!queues.containsKey(mailserver)) {
                ArrayDeque<Mail> serverQueue = new ArrayDeque<Mail>();
                queues.put(mailserver, serverQueue);
                executor.execute(new SendMail(mailserver, serverQueue));
            }
            Queue<Mail> slot = queues.get(mailserver);
            slot.offer(mail);
        }

        assertMailSentWithCorrectServer(log);
    }

    private void assertMailSentWithCorrectServer(ArrayList<Mail> log) {
        for (Mail mail : log) {
            if (!mail.server.equals(mail.sentBy.mailserver)) {
                Assert.fail("Mail sent by wrong server: " + mail);
            }
        }
    }

    private Mailserver findMailserver(Mail mail) {
        // TODO: Your lookup logic which server to use
        return new Mailserver(mail.server);
    }

    private static class Mail {
        String recipient;
        String server;
        SendMail sentBy;

        public Mail(String recipient, String server) {
            this.recipient = recipient;
            this.server = server;
        }

        @Override
        public String toString() {
            return "mail for " + recipient;
        }
    }

    public static class SendMail implements Runnable {

        private final Deque<Mail> queue;
        private final Mailserver mailserver;

        public SendMail(Mailserver mailserver, Deque<Mail> queue) {
            this.mailserver = mailserver;
            this.queue = queue;
        }

        @Override
        public void run() {
            while (!queue.isEmpty()) {
                Mail mail = queue.pollFirst();
                // TODO: Use SMTP to send the mail via mailserver
                System.out.println(this + " sent " + mail + " via " + mailserver);
                mail.sentBy = this;
            }
        }

    }

    public static class Mailserver {
        String hostname;

        public Mailserver(String hostname) {
            this.hostname = hostname;
        }

        @Override
        public String toString() {
            return hostname;
        }

        @Override
        public int hashCode() {
            return hostname.hashCode();
        }

        @Override
        public boolean equals(Object obj) {
            return hostname.equals(((Mailserver) obj).hostname);
        }

    }

}

Другие советы

Сама JMS как спецификация довольно молчит по этому вопросу. Большинство реализаций позволяют вам делать это, но не через сам JMS, а используя собственный API. Но вы не сможете подключить что-то формальное, например, MDB, к динамической очереди. Скорее вам нужно управлять своими собственными подключениями и слушателями.

В последний раз, когда мы смотрели на это в среде WebSphere, было удивительно трудно / невозможно динамически создавать очереди (я думаю, что временные очереди слишком временны для вас). Хотя API для создания очередей существовали, им потребовался перезапуск сервера, чтобы стать активным. Тогда есть проблема с MDB.

Как насчет грязного обходного пути, основанного на пословице, что все проблемы могут быть решены с помощью дополнительного уровня косвенности, который предполагает, что набор доступных принтеров сравнительно невелик.

Создать очереди с Printer01 по Printer99 (или с меньшим числом). Иметь «базу данных» который сопоставляет очереди с реальными принтерами. По мере поступления запросов на принтеры вы можете добавить их в таблицу сопоставлений. Возможно, у вас есть некоторые издержки, связанные с тем, что MDB смотрят на очереди, которые никогда не будут использоваться, но, если у вас не будет большого количества принтеров, может быть, вы можете себе это позволить?

Создайте очередь для каждого вашего SMTP-сервера и ограничьте потребителя очереди (MDB или прослушиватель сообщений) 1

Я сделал это с помощью activemq - на самом деле я опубликовал вопрос по этому вопросу в то время, поскольку у меня были похожие проблемы (в документации JMS на тот момент было указано, что это не поддерживается) и я был уверен, что это поддерживается.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top