Pregunta

A continuación se muestra mi configuración para la fragmentación remota por lotes de primavera.Mis pasos se están ejecutando localmente en lugar de de forma remota.No puedo ver mensajes en Rabbitmq.

<beans:bean id="importExchangesChunkItemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
    p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>

<beans:bean id="importExchangesChunkHandler"
    class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
    p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>

<job id="importExchangesJob" restartable="true">    
     <step id="importExchangesStep" next="importEclsStep">
        <tasklet transaction-manager="transactionManager">
            <chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
                commit-interval="${import.exchanges.commit.interval}" />
        </tasklet>
    </step>    
  </job>



<beans:bean id="passThroughItemProcessor" class="org.springframework.batch.item.support.PassThroughItemProcessor" />

<rabbit:connection-factory id="connectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" username="${rabbitmq.username}"
    password="${rabbitmq.password}" />

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

<rabbit:admin id="rmqAdmin" connection-factory="connectionFactory" />

<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />

<int:channel id="importExchangesChannel" />


<int:channel id="importExchangesReplyChannel" />


<beans:bean id="importExchangesMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />

<amqp:outbound-channel-adapter id="importExchangesOutboundAdapter"
    channel="importExchangesChannel" />

<amqp:inbound-channel-adapter id="importExchangesInboundAdapter"
    connection-factory="connectionFactory" channel="importExchangesReplyChannel"
    queue-names="${import.exchanges.reply.queue}" />


<amqp:inbound-channel-adapter id="importExchangesSlaveInboundAdapter"
    connection-factory="connectionFactory" channel="importExchangesChannel"
    queue-names="${import.exchanges.queue}" />



<amqp:outbound-channel-adapter id="importExchangesSlaveOutboundAdapter"
    channel="importExchangesReplyChannel" />

<int:service-activator id="serviceActivatorExchanges"
    input-channel="importExchangesChannel" output-channel="importExchangesReplyChannel"
    ref="chunkProcessorChunkHandlerExchanges" method="handleChunk" />


<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter" p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"/>



<beans:bean id="chunkProcessorExchanges"
    class="org.springframework.batch.core.step.item.SimpleChunkProcessor"
    p:itemWriter-ref="importExchangesItemWriter" p:itemProcessor-ref="passThroughItemProcessor"/>

<beans:bean id="chunkProcessorChunkHandlerExchanges"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler"
    p:chunkProcessor-ref="chunkProcessorExchanges" />

Configuración modificada a esto, ahora coloque un mensaje único en un momento y no procesa múltiples (debe procesar el número de mensajes iguales a la concurrencia del oyente).

<beans:bean id="simpleThreadScope"
    class="org.springframework.context.support.SimpleThreadScope" />

<util:map id="scopesMap">
    <beans:entry key="thread" value-ref="simpleThreadScope" />
</util:map>

<beans:bean
    class="org.springframework.beans.factory.config.CustomScopeConfigurer"
    p:scopes-ref="scopesMap" />





<int:channel id="importExchangesChannel" />
<int:channel id="importExchangesReplyChannel" scope="thread">
    <int:queue />
</int:channel>

<beans:bean id="importExchangesMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />

<amqp:outbound-channel-adapter
    amqp-template="amqpTemplate" channel="importExchangesChannel"
    exchange-name="${import.exchanges.exchange}" routing-key="${import.exchanges.routing.key}" />

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="${import.exchanges.listener.concurrency}"
    requeue-rejected="false" prefetch="1">
    <rabbit:listener queues="${import.exchanges.queue}"
        ref="importExchangesChunkHandler" method="handleChunk" />
</rabbit:listener-container>


<int:channel id="importEclsChannel" />
<int:channel id="importEclsReplyChannel" scope="thread">
    <int:queue />
</int:channel>

<beans:bean id="importEclsMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importEclsChannel" p:receiveTimeout="${import.ecls.reply.timeout}" />

<amqp:outbound-channel-adapter
    amqp-template="amqpTemplate" channel="importEclsChannel"
    exchange-name="${import.ecls.exchange}" routing-key="${import.ecls.routing.key}" />

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="${import.ecls.listener.concurrency}"
    requeue-rejected="false" prefetch="1">
    <rabbit:listener queues="${import.ecls.queue}"
        ref="importEclsChunkHandler" method="handleChunk" />
</rabbit:listener-container>



<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter"
    p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}" />


<beans:bean id="importExchangesChunkItemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
    p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>

<beans:bean id="importExchangesChunkHandler"
    class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
    p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>

<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />

<rabbit:direct-exchange name="${import.exchanges.exchange}">
    <rabbit:bindings>
        <rabbit:binding queue="${import.exchanges.queue}"
            key="${import.exchanges.routing.key}" />
    </rabbit:bindings>
</rabbit:direct-exchange>

Puedo ver solo 1 mensaje en la cola a la vez.Debería enviar mensajes= $ {import.exchanges.commit.interval} y todos deben ser recogidos por oyentes concurrentes y procesados paralelamente.

¿Fue útil?

Solución

No estoy seguro de lo que quiere decir con "ejecutar localmente", pero no tiene información de enrutamiento en los adaptadores de salida;Si el conejo no sabe cómo enrutar los mensajes, simplemente los deja caer.

Debe agregar routing-key="${import.exchanges.queue}" y routing-key="${import.exchanges.reply.queue}" a los adaptadores.Esto utilizará el intercambio predeterminado ("") donde las colas están unidas con sus nombres.

Además, no puede usar el mismo nombre del canal en ambos lados (importExchangesChannel).De esa manera, el adaptador de salida y el activador de servicio se suscribirán y los mensajes se distribuirán en la moda de Round-Robin.

Entonces, algunos trozos correrán localmente;Otros se dejarán caer debido al problema clave de enrutamiento.

Debe solucionar la clave de enrutamiento y usar un canal diferente en el lado de servicio.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top