This is possibly resolved by setting cacheLevelName=CACHE_CONSUMER on the ActiveMQComponent. I had the same symptoms & this resolved it for me. On a related note, I also get out of order delivery of messages with a transacted component, unless I use CACHE_CONSUMER.
JMS Rollback & redelivery not honoring the RedeliveryDelay configuration
-
18-06-2023 - |
题
I would like to have my Camel routes transactional with ActiveMQ. Rollback and maximum re-deliveries work fine, but not re-delivery delay, which should be incremental.
For example, when I failed to process message (raising an exception), it's redelivered 3 times (as expected), but with no time between it (which is not).
My Spring configuration:
<context:annotation-config/>
<context:component-scan base-package="fr.dush.poc.springplaceholder"/>
<spring:camelContext>
<spring:package>fr.dush.poc.springplaceholder.routes</spring:package>
<spring:contextScan/>
</spring:camelContext>
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
<bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
</bean>
Spring configuration continue in configuration bean:
@Component
public class CamelFactories {
private static final Logger LOGGER = LoggerFactory.getLogger(CamelFactories.class);
public static final int REDELIVERY_DELAY = 1000;
public static final int BACK_OFF_MULTIPLIER = 2;
public static final int HOUR = 3600000;
public static final int MAXIMUM_REDELIVERY_DELAY = 2 * HOUR;
public static final int MAXIMUM_REDELIVERIES = 3;
@Bean(name = "jmsConnectionFactory")
public ActiveMQConnectionFactory createFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
RedeliveryPolicy policy = new RedeliveryPolicy() {
@Override
public long getNextRedeliveryDelay(long previousDelay) {
long nextDelay = super.getNextRedeliveryDelay(previousDelay);
LOGGER.warn("Previous delay={} ; This delay={} ", previousDelay, nextDelay);
return nextDelay;
}
};
policy.setMaximumRedeliveries(MAXIMUM_REDELIVERIES);
policy.setRedeliveryDelay(REDELIVERY_DELAY);
policy.setBackOffMultiplier(BACK_OFF_MULTIPLIER);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveryDelay(MAXIMUM_REDELIVERY_DELAY);
factory.setRedeliveryPolicy(policy);
return factory;
}
@Bean(name = "activemq")
public JmsComponent createJmsComponent(JmsTransactionManager transactionManager,
ActiveMQConnectionFactory connectionFactory) {
ActiveMQComponent component = new ActiveMQComponent();
component.setTransactionManager(transactionManager);
component.setConnectionFactory(connectionFactory);
component.setTransacted(true);
return component;
}
My route is quite simple:
public class CamelRouteBuilder extends SpringRouteBuilder {
@Override
public void configure() throws Exception {
Policy required = getApplicationContext().getBean("PROPAGATION_REQUIRED",
SpringTransactionPolicy.class);
from("activemq:queue:foo.bar")
.transacted()
.policy(required)
.log(LoggingLevel.INFO, "fr.dush.poc", "Receive message: ${body}")
.beanRef("serviceBean") // throw an exception
.to("mock:routeEnd");
}
}
And in my logs, I have this, 3 times with previous delay=0:
CamelFactories:36 - Previous delay=0 ; This delay=1000
It seems I'm not alone to have this issue, but I still didn't find solution...
Thanks,
-Dush
解决方案
其他提示
I still didn't find solution. But I found an alternative: retry API from CAMEL itself.
Configuration is very similar. Spring config example:
<redeliveryPolicyProfile id="infiniteRedeliveryPolicy"
asyncDelayedRedelivery="true"
redeliveryDelay="${camel.redelivery_delay}"
maximumRedeliveryDelay="${camel.maximum_redelivery_delay}"
maximumRedeliveries="${camel.infinite_redelivery}"
backOffMultiplier="${camel.back_off_multiplier}"
useExponentialBackOff="true"/>
<routeContext>
<route>
<!-- ... -->
<!-- Define behaviour in case of technical error -->
<onException redeliveryPolicyRef="infiniteRedeliveryPolicy">
<exception>java.lang.Exception</exception>
<handled>
<constant>false</constant>
</handled>
<log message="Message can't be processed for now. I'll retry later!" />
</onException>
</route>
</routeContext>
Consumers should be transactional if you want to keep not processed messages in the ActiveMQ queue, even if you shut down application.