Question

Using Rhino Service Bus. I have a backend app that handles processing, and have another app (the client UI) that publishes messages to the back end. I have a Saga on the back end during which I want the saga to publish messages to itself, breaking down the processing into multiple smaller tasks that can run on their own threads. The problem is that messages are always discarded if they're subscribed via Orchestrates interface. I can subscribe in a different class using ConsumerOf and the consumer gets the message.

namespace Sagas
{
public class MoveJobSaga: ISaga<MoveJobState>,
    InitiatedBy<TriggerMoveJobCommand>,
    Orchestrates<TriggerMoveTerminalCommand>
{
    private readonly IServiceBus _bus;
    private readonly ITerminalFilesService _tfService;

    public MoveJobSaga(IServiceBus bus)
    {
        _bus = bus;
        State = new MoveJobState();
    }

    public MoveJobState State { get; set; }
    public Guid Id { get; set; }
    public bool IsCompleted { get; set; }

    public void Consume(TriggerMoveJobCommand message)
    {
        State.TerminalsToProcess = message.Terminals.Count();
        State.JobId = message.JobId;
        foreach (var terminal in message.Terminals)
        {
            _bus.Publish(new TriggerMoveTerminalCommand() 
            { 
                CorrelationId = message.CorrelationId,
                Name = terminal.Name
            });
        }
    }

    public void Consume(TriggerMoveTerminalCommand message)
    {
        var result = _tfService.MoveTerminalFiles(message.SourceTifDir, message.TargetTifDir, message.SourceDatDir, message.TargetDatDir);
        State.TerminalsProcessed++;

        if (State.TerminalsToProcess == State.TerminalsProcessed)
        {
            _bus.Publish(new MoveJobCompletedEvent() 
            { 
               Success = State.Success, 
               JobId = State.JobId });
        }
    }
}

public class MoveJobState
{
    public MoveJobState()
    {
        Success = true;
    }
    public int TerminalsToProcess { get; set; }
    public int TerminalsProcessed { get; set; }
    public int JobId { get; set; }
    public bool Success { get; set; }
}
}

Host Config:

<rhino.esb>
<bus threadCount="1" numberOfRetries="5" endpoint="msmq://localhost/myapp.host" />
<messages />
</rhino.esb>

BootStrapping:

public class HostBootStrapper: StructureMapBootStrapper
{
    protected override void ConfigureContainer()
    {
        base.ConfigureContainer();
        Container.Configure(sm =>
        {
            sm.For<ISagaPersister<MoveJobSaga>>().Use<InMemorySagaPersister<MoveJobSaga>>();
            sm.Scan(x =>
            {
                x.TheCallingAssembly();
                x.WithDefaultConventions();
            });
        });
    }
}
Was it helpful?

Solution

I needed to register the ISagaPersister as a singleton:

            sm.For<ISagaPersister<MoveJobSaga>>()
                .Singleton()
                .Use<InMemorySagaPersister<MoveJobSaga>>();

Also I had to move publishing of all messages used by the saga to outside the saga. So I created a service class that triggers the start message and then each message within the job.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top