Question

Spring AMQP Synchronous Transaction rollback does not work. Here DB transactions within the source are not handled by Spring. I need to have the Spring AMQP messages to be received and send within one transaction. Following are the snapshot of relevant code. Please let me know if you require anything else.

/////Connection Factory initialization

@Bean
public ConnectionFactory getConnectionFactory() {
    System.out.println("hello");
    configManager();
    String ip = ConfigManager.getQueueServerHost();
    System.out.println("IP Address : "+ip);
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(ip);
    connectionFactory.setUsername(ConfigManager.getQueueUserName());
    connectionFactory.setPassword(ConfigManager.getQueuePassword());
    connectionFactory.setPort(ConfigManager.getQueueServerPort());
    //connectionFactory.setPublisherReturns(true);
    //connectionFactory.setPublisherConfirms(true);
    return connectionFactory;

}

/////Rabbit Template initialization

@Bean
public RabbitTemplate producerRabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(getConnectionFactory());
    rabbitTemplate.setRoutingKey(ConfigManager.getProducerQueueName());
    rabbitTemplate.setQueue(ConfigManager.getProducerQueueName());
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setChannelTransacted(true);
    return rabbitTemplate;
}


/////Transactional Code

@Transactional(readOnly=false, rollbackFor=Exception.class)
public void processFile(RabbitTemplate rabbitTemplate)throws Exception{
    rabbitTemplate.setRoutingKey(ConfigManager.getConsumerQueueName());
    rabbitTemplate.setQueue(ConfigManager.getConsumerQueueName());

    Object messageObj = rabbitTemplate.receiveAndConvert();
    Message message = null;
    try{
        if(messageObj != null){
            if (messageObj instanceof Message){
                message = (Message)messageObj;
                System.out.println("Message received is '" + message.getFileName() + "' for Hospital "+message.getHospitalId());
                String newFileName = this.process(message.getFileName(), message.getHospitalId());
                this.sendMessage(newFileName, message.getHospitalId());
            }else{
                System.out.println("Unknown message received '" +  messageObj + "'");           
            }

        }
    }catch(Exception e){
        e.printStackTrace();
        throw e;
    }
}
Was it helpful?

Solution

It works fine for me; I just uploaded a Gist with my test case that shows it working.

I suggest you turn on TRACE level logging to see all the transaction activity (and compare it with the log that I put in the Gist).

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top