Pergunta

Eu estou tentando mover mensagens jms entre 2 diferentes remoto, activeMQ corretores e depois de um monte de leitura

Eu estou usando Atomikos, como eu estou escrevendo um aplicativo independente, e eu também estou usando spring para começar a coisa toda funcionar.

Eu tenho o seguinte feijão javaconfig de instalação

@Bean(name="atomikosSrcConnectionFactory")
    public AtomikosConnectionFactoryBean consumerXAConnectionFactory() {
        AtomikosConnectionFactoryBean consumerBean = new AtomikosConnectionFactoryBean();
        consumerBean.setUniqueResourceName("atomikosSrcConnectionFactory");
        consumerBean.setLocalTransactionMode(false);
        return consumerBean;
    }

    @Bean(name="atomikosDstConnectionFactory")
    public AtomikosConnectionFactoryBean producerXAConnectionFactory() {
        AtomikosConnectionFactoryBean producerBean = new AtomikosConnectionFactoryBean();
        producerBean.setUniqueResourceName("atomikosDstConnectionFactory");
        producerBean.setLocalTransactionMode(false);
        return producerBean;
    }

    @Bean(name="jtaTransactionManager")
    public JtaTransactionManager jtaTransactionManager() throws SystemException {
        JtaTransactionManager jtaTM = new JtaTransactionManager();
        jtaTM.setTransactionManager(userTransactionManager());
        jtaTM.setUserTransaction(userTransactionImp());
        return jtaTM;
    }

    @Bean(initMethod="init", destroyMethod="close", name="userTransactionManager")
    public UserTransactionManager userTransactionManager() {
        UserTransactionManager utm = new UserTransactionManager();
        utm.setForceShutdown(false);
        return utm;
    }

    @Bean(name="userTransactionImp")
    public UserTransactionImp userTransactionImp() throws SystemException {
        UserTransactionImp uti = new UserTransactionImp();
        uti.setTransactionTimeout(300);
        return uti;
    }

    @Bean(name="jmsContainer")
    @Lazy(value=true)
    public DefaultMessageListenerContainer jmsContainer() throws SystemException {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setAutoStartup(false);
        dmlc.setTransactionManager(jtaTransactionManager());
        dmlc.setSessionTransacted(true);
        dmlc.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        dmlc.setConnectionFactory(consumerXAConnectionFactory());
        dmlc.setDestinationName("srcQueue");
        return dmlc;
    }

    @Bean(name="transactedJmsTemplate")
    public JmsTemplate transactedJmsTemplate() {

        DynamicDestinationResolver dest = new DynamicDestinationResolver();

        JmsTemplate jmsTmp = new JmsTemplate(producerXAConnectionFactory());

        jmsTmp.setDeliveryPersistent(true);
        jmsTmp.setSessionTransacted(true);
        jmsTmp.setDestinationResolver(dest);
        jmsTmp.setPubSubDomain(false);
        jmsTmp.setReceiveTimeout(20000);
        jmsTmp.setExplicitQosEnabled(true);
        jmsTmp.setSessionTransacted(true);
        jmsTmp.setDefaultDestination(new ActiveMQQueue("destQueue"));
        jmsTmp.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

        return jmsTmp;
    }

2 AtomikosConnectionFactoryBean estão envolvendo um ActiveMQXAConnectionFactory (Um para cada broker) em tempo de execução antes de iniciar o DMLC.

Eu, em seguida, a instalação de um simples messageListener (que é atribuído para o dmlc antes de ser iniciado), com o seguinte método:

public void onMessage(Message message) {
    final Message rcvedMsg = message;

    try{
        MessageCreator msgCreator = new MessageCreator(){
                public Message createMessage(Session session) throws JMSException{
                    Message returnMsg = null;
                    if(rcvedMsg instanceof TextMessage){
                        TextMessage txtMsg = session.createTextMessage();
                        txtMsg.setText(((TextMessage) rcvedMsg).getText());
                        returnMsg = txtMsg;
                    }
                    else if(rcvedMsg instanceof BytesMessage){
                        BytesMessage bytesMsg = session.createBytesMessage();
                        if(!(((BytesMessage) rcvedMsg).getBodyLength() > Integer.MAX_VALUE)){
                            byte[] bodyContent = new byte[(int) ((BytesMessage) rcvedMsg).getBodyLength()];
                            bytesMsg.writeBytes(bodyContent);
                            returnMsg = bytesMsg;
                        }
                    }
                    return returnMsg;
                }
            };

            jmsTemplate.send(msgCreator);
    }
    catch(JmsException | JMSException e){
        logger.error("Error when transfering message: '{}'. {}",message,e);
    }
}

O aplicativo é iniciado sem erros específicos ou avisos no entanto, assim que eu colocar uma mensagem na fila de origem posso ver, através de logs, que o método onMessage está sendo executado repetidas vezes para a mesma mensagem, como se a transação continua sendo revertida e reiniciado novamente (Sem erros de jogar em qualquer lugar).

Eu também notei que se acontecer de eu usar a mesma fonte e o url de destino (ou seja, o mesmo corretor, mas cada um com sua própria connectionFactory), ele funciona e as mensagens são transferidas como pretendido entre a origem e o destino de fila.

O que eu estou querendo saber é

  1. O que estou fazendo de errado na instalação?Porque é que a minha transação "aparentemente" está a ser revertida e mais uma vez quando utilizar 2 corretores diferentes, mas ao utilizar o mesmo (mas mais de 2 diferentes fábricas de conexão)?
  2. Eu não estou completamente convencido de que o onMessage está atualmente fazendo a devida transação, como eu estou atualmente captura todas as exceções e não fazer nada e eu acredito que isso irá confirmar a transação do dmlc antes do jmstemplate é feito enviando a mensagem, mas não tenho a certeza.Se for este o caso, seria um SessionAwareMessageListener ser melhor, em vez?Devo set @Transacionado no método onMessage?

Alguém poderia ajudar a lançar uma luz sobre o assunto?Todos os contributos são bem-vindos.

ATUALIZAÇÃO:

Eu percebi que o problema com o "rollback" foi devido ao fato de que ambos AMQs eu estava usando eram conectados uns aos outros através de uma rede de corretores e aconteceu de eu estar usando o mesmo nome da fila de origem e de destino.Isso levou ao fato de que a mensagem foi transferida pela aplicação de uma AMQ para outro e, em seguida, imediatamente, porque havia um consumidor sobre a origem do AMQ, a mensagem seria transferido de volta para o original AMQ, que, por sua vez, era visto como uma nova mensagem, o meu aplicativo e transferido novamente e o ciclo foi infinitamente.A solução postada abaixo ajudou com outras questões.

Foi útil?

Solução

try {
   ... Code
} catch (JmsException je) {
    logger.error("Error when transfering message: '{}'. {}",message,e);
}

O código acima é engolir a exceção, você deve capturar a exceção ou relançar, de modo que o gerenciamento de transações pode lidar com isso appropriatly.Atualmente, nenhuma exceção é visto, uma confirmação é realizada, o que pode levar a resultados estranhos.

Gostaria de fazer algo como o seguinte, JmsException é a partir da Primavera e, como a maioria das exceções na Primavera, uma RuntimeException.Simplesmente rehtrow, também de log do rastreamento de pilha de exceção remover corretamente a segunda {} em seu log-instrução.

try {
   ... Code
} catch (JmsException je) {
    logger.error("Error when transfering message: '{}'.",message,e);
    throw je;
}

No entanto este irá duplicar o registo de que a Primavera também registrará o erro.

Para um JMSException fazer algo assim, convertê-lo para um JmsException.

try {
   ... Code
} catch (JMSException je) {
    logger.error("Error when transfering message: '{}'.",message,e);
    throw JmsUtils.convertJmsAccessException(je);
}

Para obter mais informações sobre o que acontece, você provavelmente deseja habilitar o log de DEPURAÇÃO para o org.springframework.jms pacote.Isso dará a você uma visão de dentro o que acontece no envio/recebimento de mensagem.

Outra coisa que você usar transacional sessões e manual reconhecendo de mensagens, no entanto você não fizer um message.acknowledge() no seu código.A primavera não vai chamá-lo devido à transação JTA.Tente mudar para SESSION_TRANSACTED em vez disso.Pelo menos para o DMLC.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top