스프링 배치 원격 청킹은 메시지를 대기열에 넣지 않고 로컬에서 실행합니까?
-
21-12-2019 - |
문제
다음은 스프링 배치 원격 청킹에 대한 구성입니다.내 단계는 원격이 아닌 로컬로 실행됩니다.Rabbitmq에서 메시지를 볼 수 없습니다.
<beans:bean id="importExchangesChunkItemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>
<beans:bean id="importExchangesChunkHandler"
class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>
<job id="importExchangesJob" restartable="true">
<step id="importExchangesStep" next="importEclsStep">
<tasklet transaction-manager="transactionManager">
<chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
commit-interval="${import.exchanges.commit.interval}" />
</tasklet>
</step>
</job>
<beans:bean id="passThroughItemProcessor" class="org.springframework.batch.item.support.PassThroughItemProcessor" />
<rabbit:connection-factory id="connectionFactory"
port="${rabbitmq.port}" host="${rabbitmq.host}" username="${rabbitmq.username}"
password="${rabbitmq.password}" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin id="rmqAdmin" connection-factory="connectionFactory" />
<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />
<int:channel id="importExchangesChannel" />
<int:channel id="importExchangesReplyChannel" />
<beans:bean id="importExchangesMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />
<amqp:outbound-channel-adapter id="importExchangesOutboundAdapter"
channel="importExchangesChannel" />
<amqp:inbound-channel-adapter id="importExchangesInboundAdapter"
connection-factory="connectionFactory" channel="importExchangesReplyChannel"
queue-names="${import.exchanges.reply.queue}" />
<amqp:inbound-channel-adapter id="importExchangesSlaveInboundAdapter"
connection-factory="connectionFactory" channel="importExchangesChannel"
queue-names="${import.exchanges.queue}" />
<amqp:outbound-channel-adapter id="importExchangesSlaveOutboundAdapter"
channel="importExchangesReplyChannel" />
<int:service-activator id="serviceActivatorExchanges"
input-channel="importExchangesChannel" output-channel="importExchangesReplyChannel"
ref="chunkProcessorChunkHandlerExchanges" method="handleChunk" />
<beans:bean id="importExchangesItemWriter"
class="com.st.batch.foundation.ImportExchangesItemWriter" p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"/>
<beans:bean id="chunkProcessorExchanges"
class="org.springframework.batch.core.step.item.SimpleChunkProcessor"
p:itemWriter-ref="importExchangesItemWriter" p:itemProcessor-ref="passThroughItemProcessor"/>
<beans:bean id="chunkProcessorChunkHandlerExchanges"
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler"
p:chunkProcessor-ref="chunkProcessorExchanges" />
구성을 이것으로 변경하면 이제 한 번에 단일 메시지를 대기열에 추가하고 여러 메시지를 처리하지 않습니다(리스너 동시성과 동일한 메시지 수를 처리해야 함).
<beans:bean id="simpleThreadScope"
class="org.springframework.context.support.SimpleThreadScope" />
<util:map id="scopesMap">
<beans:entry key="thread" value-ref="simpleThreadScope" />
</util:map>
<beans:bean
class="org.springframework.beans.factory.config.CustomScopeConfigurer"
p:scopes-ref="scopesMap" />
<int:channel id="importExchangesChannel" />
<int:channel id="importExchangesReplyChannel" scope="thread">
<int:queue />
</int:channel>
<beans:bean id="importExchangesMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />
<amqp:outbound-channel-adapter
amqp-template="amqpTemplate" channel="importExchangesChannel"
exchange-name="${import.exchanges.exchange}" routing-key="${import.exchanges.routing.key}" />
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="${import.exchanges.listener.concurrency}"
requeue-rejected="false" prefetch="1">
<rabbit:listener queues="${import.exchanges.queue}"
ref="importExchangesChunkHandler" method="handleChunk" />
</rabbit:listener-container>
<int:channel id="importEclsChannel" />
<int:channel id="importEclsReplyChannel" scope="thread">
<int:queue />
</int:channel>
<beans:bean id="importEclsMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="importEclsChannel" p:receiveTimeout="${import.ecls.reply.timeout}" />
<amqp:outbound-channel-adapter
amqp-template="amqpTemplate" channel="importEclsChannel"
exchange-name="${import.ecls.exchange}" routing-key="${import.ecls.routing.key}" />
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="${import.ecls.listener.concurrency}"
requeue-rejected="false" prefetch="1">
<rabbit:listener queues="${import.ecls.queue}"
ref="importEclsChunkHandler" method="handleChunk" />
</rabbit:listener-container>
<beans:bean id="importExchangesItemWriter"
class="com.st.batch.foundation.ImportExchangesItemWriter"
p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}" />
<beans:bean id="importExchangesChunkItemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>
<beans:bean id="importExchangesChunkHandler"
class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>
<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />
<rabbit:direct-exchange name="${import.exchanges.exchange}">
<rabbit:bindings>
<rabbit:binding queue="${import.exchanges.queue}"
key="${import.exchanges.routing.key}" />
</rabbit:bindings>
</rabbit:direct-exchange>
대기열에서는 한 번에 하나의 메시지만 볼 수 있습니다.메시지 = ${import.exchanges.commit.interval}을 보내야 하며 동시 수신기가 모두 선택하여 병렬로 처리해야 합니다.
해결책
"로컬로 실행"이 무엇을 의미하는지 잘 모르겠지만 아웃바운드 어댑터에 대한 라우팅 정보가 없습니다.Rabbit이 메시지 라우팅 방법을 모르면 메시지를 삭제하면 됩니다.
당신은 추가해야합니다 routing-key="${import.exchanges.queue}"
그리고 routing-key="${import.exchanges.reply.queue}"
어댑터에.이는 대기열이 해당 이름을 사용하여 바인딩되는 기본 교환("")을 사용합니다.
또한 양쪽에 동일한 채널 이름을 사용할 수 없습니다(importExchangesChannel
).이렇게 하면 아웃바운드 어댑터와 서비스 활성화자가 모두 구독되고 메시지는 라운드 로빈 방식으로 배포됩니다.
따라서 일부 청크는 로컬로 실행됩니다.다른 것들은 라우팅 키 문제로 인해 삭제될 것입니다.
라우팅 키를 수정하고 서비스 측에서 다른 채널을 사용해야 합니다.
제휴하지 않습니다 StackOverflow