ML Case-study Interview Question: Zero-Downtime Online Migration for Massive Scale Sharded Document Databases
Case-Study question
A rapidly growing payment platform processes over 1 trillion USD in total payments volume annually while maintaining 99.999 percent uptime. They built an in-house Database-as-a-Service using a custom document-based storage engine and a specialized Data Movement Platform. The database is sharded across thousands of nodes, each holding portions of the total data. The team needs the ability to:
Migrate data across shards with zero downtime.
Maintain data consistency and correctness during migrations.
Accommodate massive throughput and storage needs.
Merge underutilized shards to save resources.
Perform major database version upgrades without service disruption.
Design a complete approach to these challenges. Propose solutions for shard management, traffic routing, data synchronization, online migrations, performance optimization, and safe version upgrades. Explain every detail of your method, including the technologies and strategies you would apply.
Detailed Solution
Overall Architecture
They use a sharded document database managed by a custom control plane and a fleet of proxy servers. Each shard hosts a physical database with replica sets for high availability. Each proxy server intercepts queries, determines the relevant shard or shards, and routes the request. A chunk metadata service keeps track of where chunks of data reside.
Data Movement Platform
They introduced a specialized platform to move data chunks across shards online. It orchestrates every step with services responsible for bulk data imports, asynchronous replication, correctness checks, and traffic switching.
Zero-Downtime Migrations
They keep downtime under a few seconds by using a version token mechanism that briefly pauses traffic on the old shard, ensures all writes are replicated, then points the proxy servers to the target shard. Applications retry failed requests during that brief pause.
Step-by-Step Migration Flow
Chunk Registration They record which chunks need moving. Indexes on the target shard are prepared for those chunks.
Bulk Data Import They create a snapshot of the chunk at a specific time T and import that snapshot into the target shard. To overcome write throughput limits, they optimize the insertion order by sorting documents on common index fields.
Asynchronous Replication After the bulk load, they replay writes from T onward, captured through a change data capture pipeline. They tag writes to avoid cyclical replication and allow for switching back if needed.
Correctness Check They compare snapshots of source and target shards to ensure completeness of the migrated chunks without draining resources on the source.
Traffic Switch They stop traffic on the old shard, replicate any final writes, update the chunk routing in the metadata, and route traffic to the new shard. This phase lasts about two seconds, and retries on the application side absorb this gap.
Migration Completion They mark the migration complete in the metadata service and drop the old chunk from the source shard.
Handling Merges and Splits
They split large shards by identifying hot chunks and moving them to new shards. When traffic is low, they bin-pack multiple underutilized shards into fewer shards to reduce infrastructure costs. The key is granular chunk-based moves.
Major Version Upgrades
They load data onto new shards running the upgraded database version. They asynchronously replicate writes from the old shards, verify correctness, and switch traffic to the newer version without impacting availability. They bypass intermediate upgrades by forklift-migrating from an older database engine version to a newer one.
Performance Optimizations
They minimize read overhead on the source by relying on the oplog that is already captured by existing change data capture systems. They also preserve normal operations on the source during migrations. They optimize bulk loads by sorting documents according to primary indexes to minimize random writes.
Implementation Example in Python
def bulk_import(source_shard, target_shard, chunk_filter, start_time):
# Pseudocode
data_snapshot = read_data_snapshot(source_shard, chunk_filter, start_time)
data_snapshot.sort(key=lambda doc: (doc['common_index_field']))
write_bulk(target_shard, data_snapshot)
def replicate_changes(source_stream, target_shard, start_time):
# Pseudocode
for oplog_event in source_stream:
if oplog_event.timestamp >= start_time:
if not is_cycle(oplog_event):
apply_oplog_event(target_shard, oplog_event)
The coordinator orchestrates each action. It updates the metadata service, issues these bulk_import and replicate_changes calls, performs correctness checks, then handles the traffic switch.
Possible Follow-Up Questions
How do you preserve performance during large-scale migrations?
They rely on asynchronous streaming from the existing change data capture pipeline instead of running direct queries on the source. This avoids increased load on the source shard’s read capacity and prevents user-facing slowdowns.
What if a target shard fails mid-migration?
They pause replication at the last known checkpoint. Once the target shard is back, they resume from that checkpoint. If it is unrecoverable, they discard partial writes, revert to the source, and retry with a different target shard.
How do you ensure data correctness if documents change during migration?
They use a point-in-time snapshot for the bulk load, then capture all subsequent writes from that same point in time. They apply these writes in order until the target matches the source. They verify correctness by comparing snapshot counts or checksums in a separate service.
How do you handle shard routing updates to avoid stale routes?
They store a version token in the shard metadata. They update that token and enforce that only queries carrying the newest version token can proceed on the new shard. This ensures proxies do not route to old shards.
How do you simplify major engine version upgrades?
They forklift data to shards on newer engine versions. They replicate changes from the old shards and switch traffic once the target is fully synchronized. This eliminates the need for multiple in-place upgrades.
Why not rely on a standard cloud-managed solution?
They needed more customization and greater control over chunk-level routing, advanced replication, and performance tuning. Cloud solutions were either unavailable or did not offer enough flexibility at the time of initial system design.
What if you need to revert to the old shard after switching traffic?
They designed bidirectional replication with special write tags that prevent loops. If any issue arises right after switching, the coordinator can mark the old shard active again, and the data changes on the new shard are still captured and replayed back to the old shard.
How do you detect and resolve data mismatches?
They run a separate check that pulls data from both shards at a consistent point in time and compares the count of documents or hashed aggregates. If there is a mismatch, they isolate affected chunks, reimport them, and repeat until they match.
How do you manage the chunk metadata at massive scale?
They use a highly available metadata service that tracks chunk boundaries. Each proxy server queries this service to locate the relevant shard or shards. They ensure updates to chunk locations are atomic, so proxies do not route to obsolete shards.
How do you keep downtime under two seconds during the final switch?
They assign a new version token to the source shard, which rejects new queries after that token is bumped. They let the replication service copy any final pending writes. They then mark the chunk as active on the target shard by issuing the final chunk metadata update. Application retries handle any short rejections.
How do you mitigate the risk of partial moves if you have thousands of concurrent migrations?
They track each chunk migration independently. Each chunk’s migration steps are atomic. The Data Movement Platform coordinates them in parallel, but it also rate-limits the number of ongoing moves per shard to avoid resource contention.
How do you partition the system to prevent a single point of failure in the metadata service?
They replicate chunk metadata across multiple nodes with strong consensus. Each proxy server caches chunk metadata to handle short interruptions. They also keep version tokens so that stale metadata never routes queries incorrectly for too long.
Why use chunk-based migration instead of moving entire databases at once?
They want fine-grained control over load balancing, splitting large shards, and merging small ones. Chunk-based operations allow partial migrations based on key ranges or usage patterns without locking an entire database.
How do you handle changes in chunk layouts over time?
They recalculate chunk sizes and access patterns periodically. They move hot or large chunks as needed. They also bin-pack underutilized chunks into fewer shards. The Data Movement Platform can run these migrations continuously in the background.
How do you measure success for an online migration?
They track final consistency between source and target. They measure the latency impact on user queries during migrations. They also watch for replication lag to stay within acceptable bounds, typically a few seconds, before switching traffic.
How do you prevent errors when multiple application teams define custom indexes on the same collections?
They enforce minimal indexing to reduce complexity. The system automatically creates needed indexes on the target shard before bulk importing data for that chunk. They store index definitions in the metadata service so it is consistent across shards.