Удаленное пакетное разделение Spring не ставит сообщения в очередь, а запускает их локально?
-
21-12-2019 - |
Вопрос
Ниже приведена моя конфигурация для удаленного пакетного разделения Spring.Мои шаги выполняются локально, а не удаленно.Я не вижу сообщений в RabbitMQ.
<beans:bean id="importExchangesChunkItemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>
<beans:bean id="importExchangesChunkHandler"
class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>
<job id="importExchangesJob" restartable="true">
<step id="importExchangesStep" next="importEclsStep">
<tasklet transaction-manager="transactionManager">
<chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
commit-interval="${import.exchanges.commit.interval}" />
</tasklet>
</step>
</job>
<beans:bean id="passThroughItemProcessor" class="org.springframework.batch.item.support.PassThroughItemProcessor" />
<rabbit:connection-factory id="connectionFactory"
port="${rabbitmq.port}" host="${rabbitmq.host}" username="${rabbitmq.username}"
password="${rabbitmq.password}" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin id="rmqAdmin" connection-factory="connectionFactory" />
<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />
<int:channel id="importExchangesChannel" />
<int:channel id="importExchangesReplyChannel" />
<beans:bean id="importExchangesMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />
<amqp:outbound-channel-adapter id="importExchangesOutboundAdapter"
channel="importExchangesChannel" />
<amqp:inbound-channel-adapter id="importExchangesInboundAdapter"
connection-factory="connectionFactory" channel="importExchangesReplyChannel"
queue-names="${import.exchanges.reply.queue}" />
<amqp:inbound-channel-adapter id="importExchangesSlaveInboundAdapter"
connection-factory="connectionFactory" channel="importExchangesChannel"
queue-names="${import.exchanges.queue}" />
<amqp:outbound-channel-adapter id="importExchangesSlaveOutboundAdapter"
channel="importExchangesReplyChannel" />
<int:service-activator id="serviceActivatorExchanges"
input-channel="importExchangesChannel" output-channel="importExchangesReplyChannel"
ref="chunkProcessorChunkHandlerExchanges" method="handleChunk" />
<beans:bean id="importExchangesItemWriter"
class="com.st.batch.foundation.ImportExchangesItemWriter" p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"/>
<beans:bean id="chunkProcessorExchanges"
class="org.springframework.batch.core.step.item.SimpleChunkProcessor"
p:itemWriter-ref="importExchangesItemWriter" p:itemProcessor-ref="passThroughItemProcessor"/>
<beans:bean id="chunkProcessorChunkHandlerExchanges"
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler"
p:chunkProcessor-ref="chunkProcessorExchanges" />
Изменена конфигурация: теперь оно ставит в очередь одно сообщение за раз и не обрабатывает несколько (должно обрабатывать количество сообщений, равное параллелизму прослушивателя).
<beans:bean id="simpleThreadScope"
class="org.springframework.context.support.SimpleThreadScope" />
<util:map id="scopesMap">
<beans:entry key="thread" value-ref="simpleThreadScope" />
</util:map>
<beans:bean
class="org.springframework.beans.factory.config.CustomScopeConfigurer"
p:scopes-ref="scopesMap" />
<int:channel id="importExchangesChannel" />
<int:channel id="importExchangesReplyChannel" scope="thread">
<int:queue />
</int:channel>
<beans:bean id="importExchangesMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />
<amqp:outbound-channel-adapter
amqp-template="amqpTemplate" channel="importExchangesChannel"
exchange-name="${import.exchanges.exchange}" routing-key="${import.exchanges.routing.key}" />
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="${import.exchanges.listener.concurrency}"
requeue-rejected="false" prefetch="1">
<rabbit:listener queues="${import.exchanges.queue}"
ref="importExchangesChunkHandler" method="handleChunk" />
</rabbit:listener-container>
<int:channel id="importEclsChannel" />
<int:channel id="importEclsReplyChannel" scope="thread">
<int:queue />
</int:channel>
<beans:bean id="importEclsMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="importEclsChannel" p:receiveTimeout="${import.ecls.reply.timeout}" />
<amqp:outbound-channel-adapter
amqp-template="amqpTemplate" channel="importEclsChannel"
exchange-name="${import.ecls.exchange}" routing-key="${import.ecls.routing.key}" />
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="${import.ecls.listener.concurrency}"
requeue-rejected="false" prefetch="1">
<rabbit:listener queues="${import.ecls.queue}"
ref="importEclsChunkHandler" method="handleChunk" />
</rabbit:listener-container>
<beans:bean id="importExchangesItemWriter"
class="com.st.batch.foundation.ImportExchangesItemWriter"
p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}" />
<beans:bean id="importExchangesChunkItemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>
<beans:bean id="importExchangesChunkHandler"
class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>
<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />
<rabbit:direct-exchange name="${import.exchanges.exchange}">
<rabbit:bindings>
<rabbit:binding queue="${import.exchanges.queue}"
key="${import.exchanges.routing.key}" />
</rabbit:bindings>
</rabbit:direct-exchange>
Я вижу только одно сообщение в очереди одновременно.Я должен отправить сообщения = ${import.exchanges.commit.interval}, и все они должны быть подхвачены одновременными прослушивателями и обработаны параллельно.
Решение
Я не уверен, что вы подразумеваете под «локальным запуском», но у вас нет никакой информации о маршрутизации на исходящих адаптерах;если кролик не знает, как маршрутизировать сообщения, он просто отбрасывает их.
Вам нужно добавить routing-key="${import.exchanges.queue}"
и routing-key="${import.exchanges.reply.queue}"
к адаптерам.При этом будет использоваться обмен по умолчанию (""), где очереди связываются по своим именам.
Кроме того, вы не можете использовать одно и то же имя канала с обеих сторон (importExchangesChannel
).Таким образом, исходящий адаптер и активатор службы будут подписаны, а сообщения будут распространяться по циклическому принципу.
Итак, некоторые фрагменты будут работать локально;другие будут удалены из-за проблемы с ключом маршрутизации.
Вам необходимо исправить ключ маршрутизации и использовать другой канал на стороне службы.