Counterexamples regarding consistency in event sourced solutions (Part 2)
In Part 1 the introduction talked about certain blind-spots we may have, when getting into event sourcing. We also covered the first of five counter-examples, namely 'confusing timelines in the processing of events'.
This part covers two more counterexamples, one about failing projections, and another about the propagation of domain events to consumers.
The patched non-repeatable projection
If production fails and our company is bleeding money because of a primary key constraint violation or some other failure, preventing further handling of events in a critical projection, then do what I advocate against here. Do it so we can create an environment where a real solution can be fashioned. However, be aware that it puts our system in a non-visible failure-mode that should be resolved soon.
Problems
- First responder has previously 'fixed broken data' in SQL, and does so again since the projection fails. This change is not captured in events, and therefore not part of the projections 'source of truth'.
- Rebuilding the same version of the projection would need the same manipulations, at the same positions as last time. This may need manual intervention to resolve.
diagram: First responder observes problem and fixes data manually
Symptoms
- An immediate problem has been solved by manipulating the state and/or position of the projection.
- Rehydration of the state-projection is no longer deterministically reproducible.
Consequences
Scaling and versioning the projection becomes error-prone, repeating earlier failures and reintroducing inconsistencies previously 'solved manually'. A knee-jerk reaction to this could be to migrate existing ('fixed') data, along with the position of the state-projection when versioning, rather than rehydrate the projection. Doing so would severely limit our options for that part of our system; it perpetuates the need for manually fixing data in production, it’s more code to write and maintain and overall it's a solution to the wrong problem.
Treatment and recovery
Upstream
In general, we want the read-model to be able to deal with the output of the write-model. The initial error observed might be a side-effect of problems upstream. Bluntly, we could say that the fewer possible outputs our write-model has, the fewer possible inputs we need to handle downstream. The full recovery therefore includes guarding invariants in the write-model, and only commit to acceptable transitions. The write-model is our source of truth, it's important that we write a truth we can understand! For example:
A state-projection fails with a foreign key violation between [Tasks].[OriginObservationId] and [Observations].[Id]. Some troubleshooting reveals the cause is the handling 'TaskProposed' event, where the [OriginObservationId] is not the ID of any observation in the system. A conversation with a Business Analyst and Domain Expert confirms that there is not supposed to be a way a task can be proposed without an observation. To prevent further events like this one, the developers makes sure 'Propose Task'-commands are rejected if the “ObservedIn”-property of the command is not the Id of a known Observation.
This is a good example of where we might want to accept stale information in the evaluation of a command. In this case, there is no problem when observation was "unknown" according to the state read by the Task command-handler, but became "known" before the command was rejected. Also, the time it takes for an observation to become "known" for the evaluation of Task-Commands is short enough that it does not negatively impact any current use-cases. One of the reasons why it's easier to allow this is that the observation never becomes invalid for a tasks [OriginObservationId] once the state used when handling commands: there is no important limit or lock to satisfy, only the requirement that 'tasks must reference an existing observation'.
I find that it's usually not problematic around constraints like something 'existing', needs here are easier to spot. It helps to look out for the possibility of the opposite temporal transition; when something can change from 'known' to 'unknown', 'existing' to '404 Not Found', even 'deleted' to 'undeleted', etc.
As a personal heuristic, when I observe such a transition (from 'valid' to 'no longer valid'), I investigate whether it's:
- A constraint incidental to the implementation, could be loosened, maybe even changed.
- A business problem to explore, maybe there is a separate aggregate waiting to be discovered, maybe we need to handle the case differently as an alternative process etc.
- Not that important to the business, operations, or development, something we can handle more casually than enforcing it as an invariant.
Downstream
One option would be to remove the Foreign key constraint in the SQL schema of the state-projection, and make sure Queries can handle this. For example, the query could left-join the observation of the task fore returning the result to the user.
Relaxing constraints downstream is generally good advice, because constraints and guarantees works both ways: the stricter a system we build regarding constraints downstream, the less 'elbow room' we have for new kinds of features. This is because we are tightly coupling (or introducing) relations between aggregates in the write-model, implicitly, via the read-model. This was no problem when the entire system worked on 'one timeline', so it can be an invisible trap for anyone new to these kinds of systems. Decoupling life-cycles between aggregates, especially between context boundaries, helps a lot.
Performance?
While it is true that a foreign key constraint can be used by SQL to optimize the execution of read-queries, it is often used primarily to prevent invalid writes. Preventing invalid writes when consuming events is already too late! The event is already committed; we cannot contest it when handling it downstream. We could ignore it, but we would then be projecting an alternative or selective interpretation of the source of truth, rather than a 'truthful' interpretation.
In many cases, the joining on primary keys is plenty fast enough, and if not, then denormalize the projections so no joining is required. Denormalization will typically create an increase in time to hydrate state projections, but it will vastly improve query-time. Another tradeoff is that denormalized schemas become specialized for certain queries: sometimes we might need to split our 'normalized' states into several denormalized states. We can project each state in separate timelines or have them share a timeline. Beware though, that projecting separate states on the same timeline might introduce a problem, unless the effects of events on each state is idempotent. As I see it, the tradeoff here is about a high number of simultaneous subscriptions and possibly a temporally divergent state across views on one hand, and slower/heavier projection and rehydration on the other. I'm assuming that the respective stream-position of the projection is stored atomically with the projected state in both scenarios: I would always do that if at all possible.
Internal state in a state projection
At some point, we will need some existing state, that is not part of the respective event, to build some other state when handling that event. We can project a state that is only used in the handling of events, meaning state that is never queried from downstream consumers of a ReadService/Query. A temporary internal state like this can be useful when:
- We find ourselves forced to use locks like foreign key constraints, but the target of the foreign key is not yet present.
- We need to 'repair' invalid data by applying compensating/support events retroactively.
- We need a 'working state' to build a denormalized and queryable state.
diagram: Examples of situations where an internal temporary state (in grey) can be helpful
Two-phase commit for publishing events to a queue
I’ve never actually come across this one myself, but I’ve heard at least three war stories where developers have taken this path. The following disappointment for those developers, regarding how hard rehydration and versioning of projections became, is the reason I’m including it here. It’s not supposed to be hard, we are not supposed to have this problem: just stop using a queue between our write-model and our projections, and the problem disappears.
Problem
Events are (hopefully) published to a queue after being stored in the event store, perhaps as part of a repository implementation.
- We now have the possibility of lost writes, if the publishing to queue fails after events were already appended to the stream.
- Catch-up subscriptions from the queue/bus require locking and purging the queue, and republishing events to the queue from the event store.
- What about (new) events being published to the queue while we are republishing?
- Are we going to postpone publishing them to the queue (how? another queue?)
- When do we release these events?
- 'Up to date' is a moving target, it's hard (even impossible) to hit and 'switch queues' at the right time
- A lot of overhead, trying to build and maintain the above.
//Example of "Two-phase commit to queue"
//In repository or whatever writes events to the event store
CommitChanges(AggregateRoot aggregate)
{
var events = aggregate.UnperistedEvents;
_eventStore.AppendToStream(aggregate.StreamId, events);
_messageBus.Publish(events);
}
Symptoms
- Downstream event consumers do not receive all committed events.
- Rebuilding/rehydrating projections becomes complicated and brittle.
Consequences
Any problem with the commit to the queue (after the commit to the event-store) will leave the event consumers with an inconsistent source. This leads to 'lost writes' from the queues point of view, causing inconsistency in read-models, and failing policies/sagas/process managers.
Treatment and Recovery
Commit events to the event store only. Subscribers should only consume events from the "source of truth".
//Solution: Commit to one source, subscribe from that
//In repository or whatever writes events to the event store
CommitChanges(AggregateRoot aggregate)
{
var events = aggregate.UnperistedEvents;
_eventStore.AppendToStream(aggregate.StreamId, events);
}
Don’t get me wrong, I’m not advocating using an event store as an enterprise-wide event-bus. Technically, I think of event sourcing as a persistence-implementation first and foremost, and I would not want to couple the separate bounded contexts at the persistence layer. To keep the option of versioning and iterating the event-schema within a bounded context, only publish 'integration events' to external consumers, and treat these 'integration events' as part of your external contract. For more on the subject of decoupling, I recommend the excellent articles on 'Patterns for Decoupling in Distributed Systems' by Mathias Verraes.
I'd also strongly consider decoupling the messaging mechanism as well as the schema, exposing an HTTP-endpoint where external consumers can poll integration events, using the Atom Publishing Protocol or something similar: I would not want my persistence-mechanism to be coupled to my communication-contract with 'external consumers'. To be clear: I do not have these reservations regarding 'internal consumers' like projections or policies in the same bounded context, owned by the same team.
In summary: adapters producing (integration) events for 'external consumers' should only react to events from the event store, or a product of such events. A "source of truth" should be exactly that, a singular source of truth, I'll come back to this in part 4.
End of Part 2
In Part 3, we'll look at a potential trap we can end up making for ourselves when we have not yet understood that events on a specific stream expresses changes to just that one aggregate (and does not implicitly represent changes to a bunch of other aggregates).