質問

Currently working on a DDD application that is using event sourcing with redis as my main persistence store. So unfortunately I do not have built in rollbacks should something fail. The application is a monolith with 3 separate aggregate roots

The following service method will

  1. Create my "Trip" aggregrate
  2. Call store repo to persist all events associated with creating the aggregate.
  3. Call snapshot repo to persist aggregate snapshot
  4. Publish events to update other aggregates in the monolith (maintain loose coupling)

I am using c# but the language is irrelavant here:

public async Task<bool> CreateTripAsync(CreateTripRequest request)
{
    //Factory method to create Trip aggregate
    var trip = Trip.Create(request);

    //Persist trip events
    await _tripEventRepository.SaveEventsAsync(trip.Events);

    //Persist trip snapshot
    try
    {
        await _tripSnapshotRepository.SaveAsync(liveTrip);
    }
    catch
    {
        //Deal with eventual consistency by replaying stored events?? How do we trigger this to happen
    }

    //Publish all events associated with creating trip (e.g. 'PassengerAddedEvent') which will affect other aggregates in the same service
    // Mediators publish is fire and forget 
    var tasks = trip.Events.Select(async (ev) => { await _mediator.Publish(ev); });
    await Task.WhenAll(tasks);

    return true;
}

My questions are:

  1. My service is doing the above 4 things. All of them are associated with creating a trip. Would this violate SRP for this method? I don't think so as they all appear to have the same level of abstraction.
  2. If storing my events in the event store succeeds but suddenly there is an infrastructure outage and I can not update my snapshot, how can I deal with this inconsistency? I was thinking of scheduling a background task that replays the events associated with this trip to recreate the aggregate and when it comes back online persist it. However, let's say I exceed my set a max retry count?

  3. Because I am working in a monolith I am using the MediatR nuget package which is a fire and forget pub sub event dispatch/handler. Should there be an error in publishing one of these event handlers, I will have my aggregates in inconsistent states. I know that most enterprise level service buses have message queues that are automatically retried/stored after exceptions. However I can't find anything on integrating an ESB into a monolith and perhaps an ESB would be overkill in my case??

役に立ちましたか?

解決

Would this violate SRP for this method?

Not enough to be worth worrying about. You could reasonably ask whether this code really needs to understand that "save" means "update three different stores", or whether that's something that should be abstracted behind another function. For instance, it's likely many different behaviors of Trip all require a save -- should they all have their own copy of the save protocol, or should they share a common protocol.

You've got your domain concerns separated from your persistence concerns, which is the most important thing.

If storing my events in the event store succeeds but suddenly their is an infrastructure outage and I can not update my snapshot, how can I deal with this inconsistency.

Be more explicit about time/clock, and how to make the consumer of the snapshot resilient. For example, if the snapshot is saved with meta data describing where in the event stream the snapshot was taken, then you can repair the snapshot later using just the new events.

If the history I'm looking at has 100 events, and the snapshot I have was built from event 95, then it's easy to update the snapshot with the new events. So it's not a big deal if the attempt to update the snapshot fails.

This is essentially the same way a database write-ahead-log works.

Should there be an error in publishing one of these event handlers, I will have my aggregates in inconsistent states.

You can republish the events later -- especially if you design your aggregates so that they recognize messages that they have already seen (aka: idempotent event handling). That is to say, you can arrange to both publish events when the aggregate changes, and also on a schedule. That gets you at least once delivery, you just need to get the aggregate to handle the duplicates.

See also Nobody Needs Reliable Messaging.

ライセンス: CC-BY-SA帰属
所属していません softwareengineering.stackexchange
scroll top