الربيع دفعة التقسيم عن بعد لا يصطف الرسائل ولكن تشغيله محليا?

StackOverflow https://stackoverflow.com//questions/24037271

سؤال

أدناه هو التكوين الخاص بي لفصل الربيع عن بعد.يتم تشغيل خطواتي محليا بدلا من عن بعد.لا أستطيع رؤية الرسائل في رابيتمق.

<beans:bean id="importExchangesChunkItemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
    p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>

<beans:bean id="importExchangesChunkHandler"
    class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
    p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>

<job id="importExchangesJob" restartable="true">    
     <step id="importExchangesStep" next="importEclsStep">
        <tasklet transaction-manager="transactionManager">
            <chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
                commit-interval="${import.exchanges.commit.interval}" />
        </tasklet>
    </step>    
  </job>



<beans:bean id="passThroughItemProcessor" class="org.springframework.batch.item.support.PassThroughItemProcessor" />

<rabbit:connection-factory id="connectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" username="${rabbitmq.username}"
    password="${rabbitmq.password}" />

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

<rabbit:admin id="rmqAdmin" connection-factory="connectionFactory" />

<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />

<int:channel id="importExchangesChannel" />


<int:channel id="importExchangesReplyChannel" />


<beans:bean id="importExchangesMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />

<amqp:outbound-channel-adapter id="importExchangesOutboundAdapter"
    channel="importExchangesChannel" />

<amqp:inbound-channel-adapter id="importExchangesInboundAdapter"
    connection-factory="connectionFactory" channel="importExchangesReplyChannel"
    queue-names="${import.exchanges.reply.queue}" />


<amqp:inbound-channel-adapter id="importExchangesSlaveInboundAdapter"
    connection-factory="connectionFactory" channel="importExchangesChannel"
    queue-names="${import.exchanges.queue}" />



<amqp:outbound-channel-adapter id="importExchangesSlaveOutboundAdapter"
    channel="importExchangesReplyChannel" />

<int:service-activator id="serviceActivatorExchanges"
    input-channel="importExchangesChannel" output-channel="importExchangesReplyChannel"
    ref="chunkProcessorChunkHandlerExchanges" method="handleChunk" />


<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter" p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"/>



<beans:bean id="chunkProcessorExchanges"
    class="org.springframework.batch.core.step.item.SimpleChunkProcessor"
    p:itemWriter-ref="importExchangesItemWriter" p:itemProcessor-ref="passThroughItemProcessor"/>

<beans:bean id="chunkProcessorChunkHandlerExchanges"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler"
    p:chunkProcessor-ref="chunkProcessorExchanges" />

تم تغيير التكوين إلى هذا ، والآن يصطف رسالة واحدة في كل مرة ولا يعالج متعددة (يجب معالجة عدد الرسائل يساوي تزامن المستمع).

<beans:bean id="simpleThreadScope"
    class="org.springframework.context.support.SimpleThreadScope" />

<util:map id="scopesMap">
    <beans:entry key="thread" value-ref="simpleThreadScope" />
</util:map>

<beans:bean
    class="org.springframework.beans.factory.config.CustomScopeConfigurer"
    p:scopes-ref="scopesMap" />





<int:channel id="importExchangesChannel" />
<int:channel id="importExchangesReplyChannel" scope="thread">
    <int:queue />
</int:channel>

<beans:bean id="importExchangesMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />

<amqp:outbound-channel-adapter
    amqp-template="amqpTemplate" channel="importExchangesChannel"
    exchange-name="${import.exchanges.exchange}" routing-key="${import.exchanges.routing.key}" />

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="${import.exchanges.listener.concurrency}"
    requeue-rejected="false" prefetch="1">
    <rabbit:listener queues="${import.exchanges.queue}"
        ref="importExchangesChunkHandler" method="handleChunk" />
</rabbit:listener-container>


<int:channel id="importEclsChannel" />
<int:channel id="importEclsReplyChannel" scope="thread">
    <int:queue />
</int:channel>

<beans:bean id="importEclsMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importEclsChannel" p:receiveTimeout="${import.ecls.reply.timeout}" />

<amqp:outbound-channel-adapter
    amqp-template="amqpTemplate" channel="importEclsChannel"
    exchange-name="${import.ecls.exchange}" routing-key="${import.ecls.routing.key}" />

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="${import.ecls.listener.concurrency}"
    requeue-rejected="false" prefetch="1">
    <rabbit:listener queues="${import.ecls.queue}"
        ref="importEclsChunkHandler" method="handleChunk" />
</rabbit:listener-container>



<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter"
    p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}" />


<beans:bean id="importExchangesChunkItemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
    p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>

<beans:bean id="importExchangesChunkHandler"
    class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
    p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>

<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />

<rabbit:direct-exchange name="${import.exchanges.exchange}">
    <rabbit:bindings>
        <rabbit:binding queue="${import.exchanges.queue}"
            key="${import.exchanges.routing.key}" />
    </rabbit:bindings>
</rabbit:direct-exchange>

أستطيع أن أرى فقط 1 رسالة في قائمة الانتظار في وقت واحد.يجب أن أرسل رسائل = import {استيراد.التبادلات.الالتزام.الفاصل الزمني} وجميع يجب أن يتم انتقاؤها من قبل المستمعين المتزامنة ومعالجتها بشكل متوازي.

هل كانت مفيدة؟

المحلول

لست متأكدا مما تعنيه بـ "التشغيل محليا" ولكن ليس لديك أي معلومات توجيه على المحولات الصادرة;إذا كان الأرنب لا يعرف كيفية توجيه الرسائل ، فإنه ببساطة يسقطها.

تحتاج إلى إضافة routing-key="${import.exchanges.queue}" و routing-key="${import.exchanges.reply.queue}" إلى المحولات.سيستخدم هذا التبادل الافتراضي ( "" ) حيث يتم ربط قوائم الانتظار باستخدام أسمائهم.

أيضا ، لا يمكنك استخدام نفس اسم القناة على كلا الجانبين (importExchangesChannel).بهذه الطريقة ، سيتم الاشتراك في المحول الصادر ومنشط الخدمة وسيتم توزيع الرسائل بطريقة مستديرة.

لذلك ، سيتم تشغيل بعض القطع محليا;سيتم إسقاط الآخرين بسبب مشكلة مفتاح التوجيه.

تحتاج إلى إصلاح مفتاح التوجيه واستخدام قناة مختلفة على جانب الخدمة.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top