以下是我的 Spring Batch 远程分块配置。我的步骤是在本地运行而不是远程运行。我看不到rabbitmq中的消息。

<beans:bean id="importExchangesChunkItemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
    p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>

<beans:bean id="importExchangesChunkHandler"
    class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
    p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>

<job id="importExchangesJob" restartable="true">    
     <step id="importExchangesStep" next="importEclsStep">
        <tasklet transaction-manager="transactionManager">
            <chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
                commit-interval="${import.exchanges.commit.interval}" />
        </tasklet>
    </step>    
  </job>



<beans:bean id="passThroughItemProcessor" class="org.springframework.batch.item.support.PassThroughItemProcessor" />

<rabbit:connection-factory id="connectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" username="${rabbitmq.username}"
    password="${rabbitmq.password}" />

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

<rabbit:admin id="rmqAdmin" connection-factory="connectionFactory" />

<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />

<int:channel id="importExchangesChannel" />


<int:channel id="importExchangesReplyChannel" />


<beans:bean id="importExchangesMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />

<amqp:outbound-channel-adapter id="importExchangesOutboundAdapter"
    channel="importExchangesChannel" />

<amqp:inbound-channel-adapter id="importExchangesInboundAdapter"
    connection-factory="connectionFactory" channel="importExchangesReplyChannel"
    queue-names="${import.exchanges.reply.queue}" />


<amqp:inbound-channel-adapter id="importExchangesSlaveInboundAdapter"
    connection-factory="connectionFactory" channel="importExchangesChannel"
    queue-names="${import.exchanges.queue}" />



<amqp:outbound-channel-adapter id="importExchangesSlaveOutboundAdapter"
    channel="importExchangesReplyChannel" />

<int:service-activator id="serviceActivatorExchanges"
    input-channel="importExchangesChannel" output-channel="importExchangesReplyChannel"
    ref="chunkProcessorChunkHandlerExchanges" method="handleChunk" />


<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter" p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"/>



<beans:bean id="chunkProcessorExchanges"
    class="org.springframework.batch.core.step.item.SimpleChunkProcessor"
    p:itemWriter-ref="importExchangesItemWriter" p:itemProcessor-ref="passThroughItemProcessor"/>

<beans:bean id="chunkProcessorChunkHandlerExchanges"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler"
    p:chunkProcessor-ref="chunkProcessorExchanges" />

更改了配置,现在它一次对单个消息进行排队,并且不处理多个消息(应该处理等于侦听器并发数的消息数)。

<beans:bean id="simpleThreadScope"
    class="org.springframework.context.support.SimpleThreadScope" />

<util:map id="scopesMap">
    <beans:entry key="thread" value-ref="simpleThreadScope" />
</util:map>

<beans:bean
    class="org.springframework.beans.factory.config.CustomScopeConfigurer"
    p:scopes-ref="scopesMap" />





<int:channel id="importExchangesChannel" />
<int:channel id="importExchangesReplyChannel" scope="thread">
    <int:queue />
</int:channel>

<beans:bean id="importExchangesMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />

<amqp:outbound-channel-adapter
    amqp-template="amqpTemplate" channel="importExchangesChannel"
    exchange-name="${import.exchanges.exchange}" routing-key="${import.exchanges.routing.key}" />

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="${import.exchanges.listener.concurrency}"
    requeue-rejected="false" prefetch="1">
    <rabbit:listener queues="${import.exchanges.queue}"
        ref="importExchangesChunkHandler" method="handleChunk" />
</rabbit:listener-container>


<int:channel id="importEclsChannel" />
<int:channel id="importEclsReplyChannel" scope="thread">
    <int:queue />
</int:channel>

<beans:bean id="importEclsMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importEclsChannel" p:receiveTimeout="${import.ecls.reply.timeout}" />

<amqp:outbound-channel-adapter
    amqp-template="amqpTemplate" channel="importEclsChannel"
    exchange-name="${import.ecls.exchange}" routing-key="${import.ecls.routing.key}" />

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="${import.ecls.listener.concurrency}"
    requeue-rejected="false" prefetch="1">
    <rabbit:listener queues="${import.ecls.queue}"
        ref="importEclsChunkHandler" method="handleChunk" />
</rabbit:listener-container>



<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter"
    p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}" />


<beans:bean id="importExchangesChunkItemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
    p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>

<beans:bean id="importExchangesChunkHandler"
    class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
    p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>

<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />

<rabbit:direct-exchange name="${import.exchanges.exchange}">
    <rabbit:bindings>
        <rabbit:binding queue="${import.exchanges.queue}"
            key="${import.exchanges.routing.key}" />
    </rabbit:bindings>
</rabbit:direct-exchange>

我一次只能看到队列中的 1 条消息。我应该发送消息 = ${import.exchanges.commit.interval} 并且所有消息都应该由并发侦听器拾取并并行处理。

有帮助吗?

解决方案

我不确定“本地运行”是什么意思,但您没有出站适配器上的任何路由信息;如果兔子不知道如何路由消息,他就会直接丢弃消息。

你需要添加 routing-key="${import.exchanges.queue}"routing-key="${import.exchanges.reply.queue}" 到适配器。这将使用默认交换器(“”),其中队列使用其名称进行绑定。

另外,您不能在两侧使用相同的通道名称(importExchangesChannel)。这样,出站适配器和服务激活器都将被订阅,并且消息将以循环方式分发。

因此,一些块将在本地运行;其他的会因为路由密钥问题而被丢弃。

您需要修复路由密钥并在服务端使用不同的通道。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top