Question

Below is my configuration for spring batch remote chunking. My steps are running locally instead of remotely. I cant see messages in rabbitmq.

<beans:bean id="importExchangesChunkItemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
    p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>

<beans:bean id="importExchangesChunkHandler"
    class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
    p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>

<job id="importExchangesJob" restartable="true">    
     <step id="importExchangesStep" next="importEclsStep">
        <tasklet transaction-manager="transactionManager">
            <chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
                commit-interval="${import.exchanges.commit.interval}" />
        </tasklet>
    </step>    
  </job>



<beans:bean id="passThroughItemProcessor" class="org.springframework.batch.item.support.PassThroughItemProcessor" />

<rabbit:connection-factory id="connectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" username="${rabbitmq.username}"
    password="${rabbitmq.password}" />

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

<rabbit:admin id="rmqAdmin" connection-factory="connectionFactory" />

<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />

<int:channel id="importExchangesChannel" />


<int:channel id="importExchangesReplyChannel" />


<beans:bean id="importExchangesMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />

<amqp:outbound-channel-adapter id="importExchangesOutboundAdapter"
    channel="importExchangesChannel" />

<amqp:inbound-channel-adapter id="importExchangesInboundAdapter"
    connection-factory="connectionFactory" channel="importExchangesReplyChannel"
    queue-names="${import.exchanges.reply.queue}" />


<amqp:inbound-channel-adapter id="importExchangesSlaveInboundAdapter"
    connection-factory="connectionFactory" channel="importExchangesChannel"
    queue-names="${import.exchanges.queue}" />



<amqp:outbound-channel-adapter id="importExchangesSlaveOutboundAdapter"
    channel="importExchangesReplyChannel" />

<int:service-activator id="serviceActivatorExchanges"
    input-channel="importExchangesChannel" output-channel="importExchangesReplyChannel"
    ref="chunkProcessorChunkHandlerExchanges" method="handleChunk" />


<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter" p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"/>



<beans:bean id="chunkProcessorExchanges"
    class="org.springframework.batch.core.step.item.SimpleChunkProcessor"
    p:itemWriter-ref="importExchangesItemWriter" p:itemProcessor-ref="passThroughItemProcessor"/>

<beans:bean id="chunkProcessorChunkHandlerExchanges"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler"
    p:chunkProcessor-ref="chunkProcessorExchanges" />

Changed configuration to this, now it queue single message at a time and doesn't process multiple (should process number of messages equal to listener concurrency).

<beans:bean id="simpleThreadScope"
    class="org.springframework.context.support.SimpleThreadScope" />

<util:map id="scopesMap">
    <beans:entry key="thread" value-ref="simpleThreadScope" />
</util:map>

<beans:bean
    class="org.springframework.beans.factory.config.CustomScopeConfigurer"
    p:scopes-ref="scopesMap" />





<int:channel id="importExchangesChannel" />
<int:channel id="importExchangesReplyChannel" scope="thread">
    <int:queue />
</int:channel>

<beans:bean id="importExchangesMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />

<amqp:outbound-channel-adapter
    amqp-template="amqpTemplate" channel="importExchangesChannel"
    exchange-name="${import.exchanges.exchange}" routing-key="${import.exchanges.routing.key}" />

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="${import.exchanges.listener.concurrency}"
    requeue-rejected="false" prefetch="1">
    <rabbit:listener queues="${import.exchanges.queue}"
        ref="importExchangesChunkHandler" method="handleChunk" />
</rabbit:listener-container>


<int:channel id="importEclsChannel" />
<int:channel id="importEclsReplyChannel" scope="thread">
    <int:queue />
</int:channel>

<beans:bean id="importEclsMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importEclsChannel" p:receiveTimeout="${import.ecls.reply.timeout}" />

<amqp:outbound-channel-adapter
    amqp-template="amqpTemplate" channel="importEclsChannel"
    exchange-name="${import.ecls.exchange}" routing-key="${import.ecls.routing.key}" />

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="${import.ecls.listener.concurrency}"
    requeue-rejected="false" prefetch="1">
    <rabbit:listener queues="${import.ecls.queue}"
        ref="importEclsChunkHandler" method="handleChunk" />
</rabbit:listener-container>



<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter"
    p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}" />


<beans:bean id="importExchangesChunkItemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
    p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>

<beans:bean id="importExchangesChunkHandler"
    class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
    p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>

<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />

<rabbit:direct-exchange name="${import.exchanges.exchange}">
    <rabbit:bindings>
        <rabbit:binding queue="${import.exchanges.queue}"
            key="${import.exchanges.routing.key}" />
    </rabbit:bindings>
</rabbit:direct-exchange>

I can see only 1 message in queue at a time. I should sent messages = ${import.exchanges.commit.interval} and all should be picked up by concurrent listeners and processed parallely.

Was it helpful?

Solution

I am not sure what you mean by "running locally" but you don't have any routing information on the outbound adapters; if rabbit doesn't know how to route messages, he simply drops them.

You need to add routing-key="${import.exchanges.queue}" and routing-key="${import.exchanges.reply.queue}" to the adapters. This will use the default exchange ("") where the queues are bound using their names.

Also, you can't use the same channel name on both sides (importExchangesChannel). That way, the outbound adapter and service activator will both be subscribed and messages will be distributed in round-robin fashion.

So, some chunks will run locally; others will be dropped because of the routing key problem.

You need to fix the routing key and use a different channel on service side.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top