Frage

Given a publish-subscribe channel configured with a task executor, is it possible to have the invocation of its ordered subscribers interrupted if one throws an exception?

For example, in this example the 'worked' message still gets sent by the second service-activator in the sequence. I would like this to not happen.

    <int:publish-subscribe-channel id="input" task-executor="taskExecutor" />

    <int:service-activator
        input-channel="input"
        order="1" 
        expression="1 / 0" 
        output-channel="nullChannel"
    />
    <int:service-activator
        input-channel="input"
        order="2" 
        expression="'worked'" 
        output-channel="output"
    />

    <int:channel id="output">
        <int:queue />
    </int:channel>

The real world use-case for this is somewhat more complex, in that a File is published to two subscribers, one for parsing the contents of it, and another for archiving to S3. A sequence is applied so they can be aggregated and deleted later once both tasks have finished.

The S3 upload is achieved with an outbound channel adapter that returns no reply. As such, there is a publish-subscribe channel that invokes the S3 adapter first (using the order attribute) and then sends the File to a bridge, which then places it on the channel that collects the aggregated messages.

If the S3 upload failed, I do not want the file to get deleted, so the second subscriber of the upstream channel must not be called.

War es hilfreich?

Lösung

The "order" attribute really doesn't make any sense when you add a task executor because both tasks are executed asynchronously; interrupting the "second" thread might be too late. To answer your general question, no, a failure on one thread will not interrupt others.

If I understand your use case correctly, you should be able to do what you want using the new 2.2 <request-handler-advice-chain/> feature, where you can add an ExpressionEvaluatingRequestHandlerAdvice to take different actions, based on success or failure. This is discussed in this blog... http://blog.springsource.org/2012/10/09/spring-integration-2-2-retry-and-more/ and, specifically the sample code it references (for the ftp adapter)...

<int-ftp:outbound-channel-adapter
    channel="inputChannel"
    session-factory="mockSessionFactory"
    remote-directory="foo">
    <int-ftp:request-handler-advice-chain>
        <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
            <property name="onSuccessExpression" value="payload.delete()" />
            <property name="successChannel" ref="afterSuccessDeleteChannel" />
            <property name="onFailureExpression" value="payload.renameTo(payload.absolutePath + '.failed.to.send')" />
            <property name="failureChannel" ref="afterFailRenameChannel" />
        </bean>
    </int-ftp:request-handler-advice-chain>
</int-ftp:outbound-channel-adapter>
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top