Question

I'm currently using RabbitMQ with spring (spring-rabbit-1.2.0-RELEASE), with the configuration below :

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<!-- Asynchronous exchanges -->
<!-- Admin -->
<rabbit:admin connection-factory="connectionFactory"/>

<!-- Error Handler -->
<bean id="biErrorHandler" class="my.project.sync.BiErrorHandler" />
<!-- Message converter -->
<bean id="biMessageConverter" class="my.project.sync.BiMessageConverter"/>

<bean id="retryInterceptor" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
    <property name="retryOperations" ref="retryTemplate" />
    <property name="messageKeyGenerator" ref="biKeyGenerator" />
</bean> 

<bean id="biKeyGenerator" class="my.project.sync.BiMessageKeyGenerator"/>
<bean id="rejectAndDontRequeueRecoverer" class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="3000" />
            <property name="maxInterval" value="30000" />
        </bean>     
    </property>
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="3" />
        </bean>     
    </property>
</bean>  

 <rabbit:queue id="biSynchronizationQueue" name="BI_SYNCHRONIZATION_QUEUE" durable="true" />
<rabbit:listener-container message-converter="biMessageConverter" concurrency="1" 
    connection-factory="connectionFactory"
    error-handler="biErrorHandler"
    advice-chain="retryInterceptor"
    acknowledge="auto">
    <rabbit:listener queues="BI_SYNCHRONIZATION_QUEUE" ref="biSynchronizationService" method="handleMessage"/>
</rabbit:listener-container>

I would like to requeue the message at the tail of the queue after the third attempt. But i don't find a way to perform that.

Is someone has an idea?

Thanks in advance for your help.

Was it helpful?

Solution

Instead of using the RejectAndDontRequeueRecoverer, use a custom MessageRecoverer that uses a RabbitTemplate to send the message to the back of the queue; then throw an AmqpRejectAndDontRequeueException so the message will be rejected.

You could subclass the RejectAndDontRequeueRecoverer, send the message in recover() and then call super.recover() (which simply throws the exception). Or just do all the implementation in your own recover().

OTHER TIPS

I used this tip, but then went a slightly different direction by using a deadletter queue. For me, it was more explicit than putting the message at the end of the queue.

 <rabbit:queue name="content.variantchange.queue" durable="true" queue-arguments="queueArguments"/>

<util:map id="queueArguments">
    <entry key="x-dead-letter-exchange" value="content.deadletter.topic"/>
</util:map>

  <rabbit:fanout-exchange name="content.deadletter.topic">
    <rabbit:bindings>
        <rabbit:binding queue="content.deadletter.queue"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<rabbit:queue name="content.deadletter.queue" durable="true"/>

Once you have the dead letter queue in place, you can do whatever you want via another consumer (including adding it back to the original queue.)

I also ended up using the stateless version of the interceptor Gary recommended which allowed me to not have to worry about a message id generator.

<bean id="retryInterceptor" class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
    <property name="retryOperations" ref="retryTemplate" />
</bean>
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top