In our last post, The Future of Event Stream Processing, we introduced the idea of a log-based architecture. Logs are a superior approach to database-centric architectures for processing event data for analytics, materialized views, auditing, speculative queries, and more. In this post, we’re going to zero in on one of the most challenging components of implementing log designs: record replay.
Record replay rewinds the clock
The central idea behind log-oriented architectures are that all events are stored verbatim in the order which they are received. Programs non-destructively consume those streams of events to perform work or accrete state that supports other applications. Event stream consumers passively read from the event stream and track their own progress. The key to this is that when a consumer reads a record, it doesn’t remove it from the stream.
Since different consumers can read the same events in the same order, it’s implied that a consumer can “rewind” its position in the log and replay events that it’s already seen. This is whimsically referred to as time travel, since a consumer can skip forwards or backwards in the log to arbitrary points in time. Replay serves as the backbone for implementing efficient auditing. As it turns out, there are a host of other uses for replay that make it live up to its nickname of time travel - speculative queries and retroactive code correction are two such examples. Let’s talk a little about these to get more perspective on the challenges that replay needs to support.
Speculative & self-healing systems
Speculative queries are questions that can be asked of an event stream, supposing that some fact were true. Suppose a retail store generally queries for the total profits of the quarter given a 6% sales tax, excluding items related to footwear and beverages. With a speculative query, a retailer can ask a different question alongside the original to compare. For example, what’re the profits if there’s a 6.4% sales tax that now excludes pocket watches? With replay, speculative queries can reinterpret all incoming events to reach new conclusions, and can do so side-by-side with existing queries.
Retroactive code correction is a technique for implementing self-healing systems. When events compound to accrete state, like an aggregate, there’s always the possibility that the code performing that accretion is wrong - or maybe you learned a new fact that makes your program out-of-date. In the previous example, you’d need retroactive correction if you applied an incorrect tax percentage. Traditional systems need to manually fix these problems, querying and rewriting centralized databases. Needless to say, this is a reckless practice that can result in permanent data loss. With a log-centric architecture, we can safely replay all events from the log and derive an answer with an updated, correct version of the code.
We usually hand-wave when discussing replay
Record replay arguably has more potential to grow an architecture’s power than any other technique. Incidentally, the design work that goes into reifying replay from an idea into actual code typically receives a huge amount of hand-waving. The idea sounds simple: when we need to, we’ll simply command consumers to start reading the log of events from an earlier point in time, and everything will just work. Right?
Reality-check: this puzzle has many pieces
Let’s explore common roadblocks to implementing replay. These issues are virtually always papered over during design discussions, only to bite back hard down the road.
“Switch-over” is a colloquial phrase used to describe the process by which applications interested in consumer state change the location from which they pull accreted information. When a consumer that’s reading from the log is commanded to replay from an earlier point in time, it’s advised to store the newly accreted state in a different location, leaving the previous storage in-tact. This implies that readers of that state need navigational hints to find the new location.
When a consumer rewinds its position in the log and begins to accrete state once again, dependent applications that want access to that state may begin making requests to the consumer to serve its data. If the consumer that rewound hasn’t sufficiently “caught up” beyond the last location the requesting application learned about, it will see stale views reflective of an earlier point in time. This is a frequent cause of confusion and bugs because strict forward progress is implicitly assumed.
Very long replay durations
When a sufficient number of events have been stored in the log, the time required to replay can become prohibitive, due to either the amount of data that needs to be re-processed or because of the degree of work the consumer needs to apply to that data. Replays typically constitute downtime, and if it takes 2 hours to replay a full data stream, that means 2 hours of blackout.
Determining resume vs. replay safety
There are certain circumstances when a consumer can go offline and simply resume reading from the log at the place it last left off. This is desirable if a consumer was restarted by an OS process, for instance. If the same code is interpreting the same event stream, it’s helpful to simply resume to avoid the cost of a full replay. The question that inevitably arises is, “When is it safe to resume and avoid the cost of a replay?” The answer is, of course, it depends! Choose wisely, because resuming progress when a replay is actually required can generate bad information for dependent applications.
Downtime during replay
For most systems, replaying events means downtime because while consumers rewind and re-process to an acceptable state, dependent applications will be waiting around for access, spiking requests that can’t be answered. You could start serving caches of stale data, but how stale is too stale? It’s a hard decision to make, and the ideal situation is to avoid being down in the first place.
Reviving historical archives
Log storage systems, like Kafka, aren’t designed to store huge amounts of data indefinitely. Logs should be configured with a retention policy to boot out old events and make room for the new. This is, however, a conflict of interest for event sourced architectures, where the entire state of the world is represented solely through events on the log. Without all the events, it’s not possible to do a full historical replay and retain the power to reinterpret incoming data.
Replay needs to be a fully automated process, not a manual one
When we talk about log based architectures and replay, it’s often couched in the context of being a manual one-off scenario that’s convenient when you need it. The truth is that in order to use replay effectively, it needs to be a fully automated process. And that means it needs to be baked in from the start of an event processing architecture.
Intelligent replay strategies
Now that we’ve covered the most common scenarios that trip designers up, we’re going to look at solutions to each of the problems we discussed.
Wrangling the switch-over: read-level indirection
Handling "the switch-over" is probably the easiest obstacle to overcome. Consumer state should be checkpointed to durable storage as the log is consumed, and that storage location should be externally discoverable to interested parties. In this manner, when an application wants access to consumer state, it first queries for the storage location, and then follows that location to the goods. This is effectively using pointers in high-level design.
The storage locations should typically be stored as UUIDs to avoid naming conflicts. You'll run into the situation where an application will receive an old location for stale consumer state. If locations and state are accessed through microservices, microservices can intercept requests and automatically reroute the location change themselves. A little indirection goes a long way. Be sure to have a cooldown time for old state views before garbage collecting them in case an application does get its hands on a stale view.
Inform applications: attach logical clocks to data views
The stale consumer problem ratchets the difficulty up a notch since its concerns span multiple layers of the event processing system. Without using a sophisticated design that leverages zero downtime (we'll get to that later!), you have two options: disallow applications from accessing consumer state while it's being rebuilt, or mark each data view with a timestamp so that applications can decide for themselves whether reading that data is useful.
Different applications will have different needs. If the domain is banking, for example, it would be unacceptable to see a version of your bank account that's reflective of 2 years ago. On the other hand, if the domain is analytics, it would be useful to get a snapshot of consumer state as soon as it's available. Timestamps attached to views will inform applications about how behind the rebuilt data view is from what they last saw. These timestamps should be indicative of what log entries the aggregate contains and must be consistently applied.
Interestingly enough, this is also the technique needed to achieve read-your-own-writes.
Speed up long replays: temporarily scale dedicated resources
Fixing the problem of long replays has an obvious solution with a non-trivial implementation. When performing a replay, assuming you’re processing events with a distributed streaming cluster, simply scale the resources decided to the processing to make it go faster. When the replay is finished, scale back down the the normal amount of resources.
While this sounds easy in practice, full automating the process is tricky. Knowing when the replay is “finished” can be a difficult property to communicate to an autoscaler, particularly because multiple consumers may be playing catch-up all at the same time.
Further, if your data is partitioned through grouping, and that partitioning dictates strict serializability, you can’t turn up the parallelism any higher without breaking serializability invariants. There’s not a great solution - you’re simply bound at the speed of the hardware. If you can scale up to some degree, seeing this work in action is a delight.
Resume vs. replay: static analysis
The determination of resuming versus replaying is difficult. In general, it’s always safe to replay so long as dependent applications can either deal with downtime or access to partially formed state. On the other hand, you’ll want to avoid a replay if possible. One way you can go about determining this is through static analysis. If the consumer of an event log is executing the same code as it was before, it’s safe to resume. If the aggregations or materialized views that the consumer are producing have changed in structure, a replay is required. Everything else falls into a grey area.
Do changes in transformation or filtering functions dictate a full replay? It depends! Only you, as the designer, can make that decision. Static analysis can at least group the situation into 3 buckets: forced replay, safe replay, or possible replay.
Dealing with downtime: shadow processing
Under most designs, while a consumer replays events from the log, your application is effectively down. This is especially troublesome if the time to replay the log is long. To combat this, one can employ the sophisticated technique of shadow processing.
Shadow processing is the act of starting a secondary, parallel consumer to the original. The new consumer tracks the current offset of the old consumer and replays with increased resources to overcome the rate of the original consumer. When the new consumer is within an acceptable range of where the old consumer is in terms of events seen, the switch-over occurs, and downstream applications seemlessly cut over to find the new state. After a period of time, the old consumer spins down and is garbage collected. This design technique achieves near zero-downtime.
Reviving historical data: combine batch & streaming approaches
Finally, we address the problem of restoring and replaying historical data that has since been purged from the log. The approach we suggest a two-phased.
The first phase pertains to storage. When events are appended to the tail of the log, a secondary consumer reads the log and automatically archives all events to permanent, long term storage. All events include a unique identifier so that it's possible to correlate events that exist on the log to their analog in permanent storage.
The second phase relates to the consumption mechanism. Consumers need to be outfitted with enough smarts to begin a replay by looking at permanent storage first, processing all the data that its finds. Next, the consumer needs to track the appropriate offset to "switch-over" to in the log and begin processing realtime events. This elegant approach allows consumers to backfill very old data sets and catch up with realtime data in one shot. It beats out the Lambda Architecture in both engineering and operational complexity since the switch between batch and realtime is completely invisible to consumers. It's notable that this design technique inverts where the "switch-over" happens by repeating that mechanism one layer higher in the architecture.