Counterexamples regarding consistency in event sourced solutions (Part 3)

Julian May  |  04 July 2021

In Part 2 we looked at two counterexamples, one related to failing projections, and another about how not to propagate domain events.

This part only covers one example, one I did not see coming myself. The problem surfaced when we started guarding invariants in the write-model, as a means to resolve unintended outputs and failing projections. 

Faking the changes where the user reads them

Moving a monolithic system towards event sourcing is hard. I would go so far as to say I have yet to see it succeed. However, moving a part of such a system towards event sourcing is perfectly doable. To succeed, you need to have cleaner vertical slices in your architecture (by context or feature), and make the transition vertically, for the parts of your system where it makes sense to do so. The system where this counterexample was prevalent had an 'anemic domain model' in the sense that the aggregates emitting events did nothing else: all logic guarding invariants was in Services or Controllers. Read-models from state-projections where built first, and the problem described here occurred when we started fixing the upstream problems by guarding invariants in the aggregates.

Problems

A business policy ('Whenever X, then Y') has been simulated in one or more projections, making the presented state differ from the actual state of the event sourced aggregate. For example, let’s say you have the business rule:

"Whenever a contractor changes company affiliation, all tasks assigned to that contractor changes status to ‘unassigned’".

In this example, the contractors and tasks are all individual event streams. You will want to be very conscious about which 'fields' are owned by which streams, you can end up lying to yourself otherwise.

Counterexamples-7

diagram: Example of a projection 'making up' state, disappointing and confusing the end-user when the truth of that state is used for a transition.

In this example, the projection updates its state of tasks based on an event from a contractor-stream.

//In 'Overseer app' projection

  Handle(Unassigned ev)
  {
      Tasks.ByTaskId(ev.TaskId).Apply(ev);
  }

  Handle(Assigned ev)
  {
      Tasks.ByTaskId(ev.TaskId).Apply(ev);
  }

  Handle(CompanyAffiliationChanged ev)
  {
      _contractors.ById(ev.ContractorId).Apply(ev);
      _tasks.AssignedTo(ev.ContractorId)
          .ForEach(task => task.Status = "Unassigned");
  }

The task aggregate however, simply enforces invariants based on events from it's own stream.

//In 'Task' aggregate root

  DocumentFollowup(DocumentFollowup cmd)
  {
      if (IsOverdue && Status != "Unassigned")
          throw Rejections.ResponsibilityMustBeAssigned;
 
      Emit(new FollowupDocumented(...));
  }   

Symptoms

  • Commands have unexpected effects and results, seeming to defy the invariants guarded in the aggregate.
  • Information in read-model seems to defy the aggregate state.

Consequences

  • Commands transition aggregates when that should not be possible.
  • Commands are unexpectedly rejected based on state different from what the user sees.
  • 'The truth' is not propagated throughout the system: each consumer would have to make up the same 'amendment to the truth'. As an example, this could mean a policy might penalize the contractor since the task becomes overdue, even though the contractor is not (supposed to be) responsible for the task any longer.

Treatment and recovery

Make the implicit explicit. Let’s go back to the business rule:

“Whenever a contractor changes company affiliation, all issues assigned to that contractor change status to ‘unassigned’”.

The rule gives us a solid hint: “Whenever…”; We need a policy/saga/process manager to react to the “Company Affiliation Changed” event and unassign the relevant tasks according to the rule. Those changes become a new 'unassigned' event per task, giving us:

  • Alignment of business-rules and our 'source of truth' (the tasks event-streams).

  • Explicit and propagated information to handle in projections and policies alike.

  • Encapsulation of the transitive cause-effect relation between Company affiliation and task assignment.

Counterexamples-8

diagram: Proposed solution for 'faking the changes…': stop faking them

This policy needs cross-aggregate information to know which tasks to unassign, meaning we need information about all tasks to know which tasks to change. This state is an internal projection targeted at answering just this question, or a group of questions if the policy is more complicated or you choose to group several policies on one timeline.

//In 'Assignments and Affiliations'-policy

  //Maintain 'TaskAssignment' state
    Handle(Assigned ev)
    {
      _taskAssignments.Set(ev.TaskId, ev.AssignedContractorId, ev.AssignedCompanyId);
    }
    Handle(Unassigned ev)
    {
      _taskAssignments.Clear(ev.TaskId).Assignee = null;
    }

Given this state, we’re now able to identify which tasks to unassign according to the business policy.

Rather than unassigning all tasks in a for-each loop when we handle the CompanyAffiliationChanged event, we translate the one-to-many cardinality with PolicyPrepared-events, one for each identified task, which is committed to the policies own internal stream. I suggest doing this in a transaction, in case there are thousands of identified tasks). Each PolicyPrepared-event is a 'promise' to execute the policy on the respective task, when that specific PolicyPrepared-event is handled.

This translation, from one to many, not only minimizes the scope of failure to individual transactions, it also isolates coupling to the CompanyAffiliationChanged event at the boundary, which is important when the policy is about changes in one context based on events from another context (think anti-corruption layer).

I would look for better names than PolicyPrepared, perhaps something like Contractor of Task changed affiliation, but these events are rarely interesting for users and other components as domain events. They are mostly a mechanism to prevent blocking and multiple transactions at the same position of the subscription.

//Still in 'Assignments and Affiliations'-policy

  //Prepare the transition, using the state
    Handle(CompanyAffiliationChanged ev)
    {
      var policyEvents = _taskAssignments.ByContractorId(ev.ContractorId)
          .Select(t => new PolicyPrepared(t.TaskId))

      _eventstore.AppendToStream(
        "TaskPolicy-AssignmentsAndAffiliations", policyEvents);        
    }
//Still in 'Assignments and Affiliations'-policy

  //Invoke the transition of individual tasks
    Handle(PolicyPrepared ev)
    {
      var cmd = PolicyCommand.ForRelatedTaskOf(ev);
      var task = _repository.LoadTask(ev.TaskId);
      var result = task.Unassign(cmd);

      _repository.CommitChanges(task);
    }

The PolicyPrepared events also play an important part in idempotent command-handling as they include values that can be used as a 'checkpoint', like the position of the initiating CompanyAffiliationChanged event on the policies timeline, or the CorrelationId of that event.

Each PolicyPrepared event is received individually via the policy's single subscription. When a PolicyPrepared event is handled, a command (with the checkpoint) for the respective Task is executed. The checkpoint prevents wrongfully unassigning the task in case the event is handled more than once (aggregate-state of the task includes checkpoints 'Already Handled'). There are several ways to make commands idempotent; I think this one is relatively simple.

//In 'Task' aggregate root

  Unassign(UnassignBasedOnAffiliationPolicy cmd)
  {
    if(AlreadyHandled(cmd.CheckpointRef) || Status == "Unassigned")
        return;

    Emit(new Unassigned(cmd.Reason, cmd.CheckpointRef));
  }

  AlreadyHandled(string checkpoint) => _checkpointsHandled.Contains(checkpoint);

  Apply(Unassigned ev)
  {
      Status = "Unassigned";
      if(ev.CheckpointRef != null)
        _checkpointsHandled.Add(ev.CheckpointRef);
  }

With this policy in place, the 'Overseer app' projection no longer has any need to handle the 'CompanyAffiliationChanged'-event and simulate the business-rule. It can simply handle 'Assigned'- and 'Unassigned'-events, as the business-rule coupling company affiliation with task assignments is encapsulated by the policy.

Let's review the mechanics in place in this example:

  • Used a single subscription for state and behavior:

    • We want the state for the behavior and the triggers of the behavior to follow the same timeline. This gives us determinism and avoids "confusing the timelines".
  • Separated preperation/translation from execution

    • Since one domain event should cause a change in several aggregates, we made sure that each aggregate is handled in an isolated manner. In effect, we translated one big payload into several smaller ones. Easier troubleshooting, smaller scope of failure and the option of distributing the payload are some the reasons to so. If the triggering domain event would only cause the policy to invoke a single aggregate, we could skip this one.
  • Made a separate command for the policy to unassign a task

    • This does not pertain to consistency exactly, but it deserved mentioning in conjunction with the introduction of automated processes invoking aggregates: In my experience, it's best to separate user-driven commands from those of automated processes. Business-logic and rejection-tolerance often differs widely between those two categories. So in our fictional solution there is only one Unassigned- event, but there is both a user-driven Unassign-command, as well as the UnassignBasedOnAffiliationPolicy-command.
  • Marked the affected aggregate with a checkpoint

    • Idempotency is important for consistency, especially in distributed systems. Subscriptions of EventStoreDB gives us "At least once" delivery rather than "At most once", meaning that the same event might be handled twice. Beyond this, if a bug or hardware failure should cause an exception to be thrown in an event-handle after the aggregate transition is committed, causing the event to be retried. In such cases, the operation might in itself be idempotent, in which case we don't need a separate mechanism here. An example of this is Checking into a hotel: Once you are checked in, you cannot check-in again or undo the check-in. Other operations are inherently not idempotent, like incrementing a number. In the middle there is a grey area where I find most behavior belongs. In this case, our Unassign-command. If the task is already Unassigned, nothing happens when we unassign - do it two times, or ten times in a row, nothing happens (provided you abstain from emitting new 'Unassigned'-events), so it looks idempotent... But only in that narrow window of the aggregates lifecycle. When you need to guarantee idempotency for operations like this, across the lifecycle of the respective aggregate, this is my preferred way of doing so.

Remember that you do not control global ordering from any one place in the system. This policy cannot force validation of a regular Assign command on a task to be 100% consistent with a contractor’s current affiliation. What it can do however, is identify when a task was assigned to [contractor ‘A’ in company ‘X’] after contractor ‘A’ changed affiliation to company ‘Y’ and compensate accordingly. Such compensation could be to unassign the task like it would when the contractor changed affiliation to company ‘Y’ after the task was assigned 'consistently'. Depending on how often contractors with active tasks change assignment, this might be an extremely rare edge-case: maybe it’s enough to identify it and just send an email or log it, so it can be resolved manually. Maybe it's enough to do nothing for now, and accept that maybe this possible edge-case never happens.

This last option of simply not handling the edge-case is valid for a lot of cases. Consider the consequences of a task being assigned to one contractor and the "wrong/old" company - the user discovering this discrepancy might report it as a bug. In this case, a user or supporter can correct the assignment of the task manually. Maybe it's fine to wait for three or ten of such incidents before implementing self-correcting functionality?

Again, the constraints we need depends on the guarantees the business needs. Our 'state-bias' often leads us down a path where we believe we can (and must) control and guarantee all such things. With this in mind, I would suggest that organizations striving to adopt event sourcing, also need to adopt collaborative ownership and close communication with "the business people" about their software solution, if they do not already have that. Discuss the tradeoffs, make sure everyone understands that each "guarantee" across aggregate boundaries is a "feature" which must be implemented, adding complexity to the overall solution. Sometimes this complexity outweighs the overhead of "fixing things manually", sometimes it does not.

As a bonus, these discussions are a prime opportunity to discover better aggregate boundaries, encapsulating domain invariants rather than "some information we already had a noun for" (looking at you "Order", "Reservation", "Product", "Task" etc...).

End of Part 3

In the final part, we'll talk about one of the big root causes for many of the problems we might suffer from when introducing event sourcing to a state-based system: Split Brain


Photo of Julian May

Julian May Julian is a system developer working with Line of Business software, currently for the construction industry. Always curious to "why is it like this?" he focuses on building bridges between the current consequences of yesterday's decisions and the ambitions for tomorrow. His personal objectives are solving root problems, lifting from the bottom, and facilitating success by removing obstacles.