Question

I would like to have my Camel routes transactional with ActiveMQ. Rollback and maximum re-deliveries work fine, but not re-delivery delay, which should be incremental.

For example, when I failed to process message (raising an exception), it's redelivered 3 times (as expected), but with no time between it (which is not).

My Spring configuration:

<context:annotation-config/>
<context:component-scan base-package="fr.dush.poc.springplaceholder"/>

<spring:camelContext>
    <spring:package>fr.dush.poc.springplaceholder.routes</spring:package>
    <spring:contextScan/>
</spring:camelContext>

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>

<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
<bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="jmsTransactionManager"/>
    <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
</bean>

Spring configuration continue in configuration bean:

@Component
public class CamelFactories {

private static final Logger LOGGER = LoggerFactory.getLogger(CamelFactories.class);

public static final int REDELIVERY_DELAY = 1000;
public static final int BACK_OFF_MULTIPLIER = 2;
public static final int HOUR = 3600000;
public static final int MAXIMUM_REDELIVERY_DELAY = 2 * HOUR;
public static final int MAXIMUM_REDELIVERIES = 3;

@Bean(name = "jmsConnectionFactory")
public ActiveMQConnectionFactory createFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");

    RedeliveryPolicy policy = new RedeliveryPolicy() {

        @Override
        public long getNextRedeliveryDelay(long previousDelay) {
            long nextDelay = super.getNextRedeliveryDelay(previousDelay);
            LOGGER.warn("Previous delay={} ; This delay={} ", previousDelay, nextDelay);
            return nextDelay;
        }
    };
    policy.setMaximumRedeliveries(MAXIMUM_REDELIVERIES);

    policy.setRedeliveryDelay(REDELIVERY_DELAY);
    policy.setBackOffMultiplier(BACK_OFF_MULTIPLIER);
    policy.setUseExponentialBackOff(true);
    policy.setMaximumRedeliveryDelay(MAXIMUM_REDELIVERY_DELAY);

    factory.setRedeliveryPolicy(policy);

    return factory;
}

@Bean(name = "activemq")
public JmsComponent createJmsComponent(JmsTransactionManager transactionManager,
        ActiveMQConnectionFactory connectionFactory) {

    ActiveMQComponent component = new ActiveMQComponent();
    component.setTransactionManager(transactionManager);
    component.setConnectionFactory(connectionFactory);
    component.setTransacted(true);

    return component;
}

My route is quite simple:

public class CamelRouteBuilder extends SpringRouteBuilder {

  @Override
  public void configure() throws Exception {

    Policy required = getApplicationContext().getBean("PROPAGATION_REQUIRED",
            SpringTransactionPolicy.class);

    from("activemq:queue:foo.bar")
            .transacted()
            .policy(required)

            .log(LoggingLevel.INFO, "fr.dush.poc", "Receive message: ${body}")
            .beanRef("serviceBean") // throw an exception
            .to("mock:routeEnd");
  }
}

And in my logs, I have this, 3 times with previous delay=0:

CamelFactories:36 - Previous delay=0 ; This delay=1000

It seems I'm not alone to have this issue, but I still didn't find solution...

Thanks,

-Dush

Was it helpful?

Solution

This is possibly resolved by setting cacheLevelName=CACHE_CONSUMER on the ActiveMQComponent. I had the same symptoms & this resolved it for me. On a related note, I also get out of order delivery of messages with a transacted component, unless I use CACHE_CONSUMER.

OTHER TIPS

I still didn't find solution. But I found an alternative: retry API from CAMEL itself.

Configuration is very similar. Spring config example:

<redeliveryPolicyProfile id="infiniteRedeliveryPolicy" 
                         asyncDelayedRedelivery="true"
                         redeliveryDelay="${camel.redelivery_delay}"
                         maximumRedeliveryDelay="${camel.maximum_redelivery_delay}"
                         maximumRedeliveries="${camel.infinite_redelivery}"
                         backOffMultiplier="${camel.back_off_multiplier}"
                         useExponentialBackOff="true"/>

<routeContext>
  <route>
    <!-- ... -->

    <!-- Define behaviour in case of technical error -->
    <onException redeliveryPolicyRef="infiniteRedeliveryPolicy">
       <exception>java.lang.Exception</exception>
       <handled>
           <constant>false</constant>
       </handled>

       <log message="Message can't be processed for now. I'll retry later!" />
    </onException>
  </route>
</routeContext>

Consumers should be transactional if you want to keep not processed messages in the ActiveMQ queue, even if you shut down application.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top