← Back to Backend Fundamental Components

Databases

Contents

A technology reference on databases — both the transactional (OLTP) systems that hold mutable, conservation-critical state at the heart of an application, and the analytical (OLAP / columnar) systems that sit beside them and answer aggregation queries over billions of rows. The two are complements, not competitors: OLTP guarantees "this either happened or it didn't" on small mutations; OLAP gives you sub-second scans over the consequences. Part 1 (§1–§15) covers transactional databases in depth because that is where byte-level mechanics matter most. Part 2 (§17–§22) covers analytical / columnar databases at the architectural and design-space level. §16 ties Part 1 together; §23 ties the whole picture together.


Part 1: Transactional Databases

§1. What Transactional Databases Are

A transactional database is a stateful storage system whose contract is built around ACID — Atomicity, Consistency, Isolation, Durability — applied to multi-row mutations of structured data. It is the system underneath anything where partial application is unacceptable: a bank account balance, an inventory count, a user's profile, a feature-flag assignment, a task in a workflow. The defining property is "promises that a transaction either fully happened or fully did not, and the result survives power loss."

Transactional databases sit at one corner of a four-way design space:

  • Relational row stores (OLTP — OnLine Transaction Processing) — MySQL InnoDB, PostgreSQL, Oracle, SQL Server. B+ tree, row-oriented, strong ACID, secondary indexes. Canonical transactional database. Excellent for point lookups and small range scans; poor for analytical scans.
  • NewSQL / distributed SQL — Spanner, CockroachDB, YugabyteDB, OceanBase, TiDB. Relational interface and ACID, with horizontal scaling via consensus (Paxos/Raft) under the hood. Trades commit latency (TrueTime wait, Raft RTT — Round Trip Time) for cross-shard transactions that just work.
  • Wide-column / LSM (Log-Structured Merge) stores — Cassandra, ScyllaDB, HBase, Bigtable. LSM tree storage, partition-key access, tunable consistency. Strong at write throughput and horizontal scale; per-key linearization only (via expensive Paxos-backed Lightweight Transactions); multi-row transactions weak or absent.
  • Document stores — MongoDB, Couchbase, DocumentDB. Per-document JSON-like records, schemaless, single-document transactions historically (multi-document added in MongoDB 4.0+). Trades schema rigor for developer velocity; the document is the transactional unit.

This doc focuses on the transactional layer — OLTP relational row stores and NewSQL cousins — because that is where the byte-level mechanics matter most. Wide-column and document variants borrow much of the machinery (WAL, MVCC, replication) but make different storage-layer tradeoffs; we contrast them in §3 and §13.

What transactional databases are NOT good for:

  • Analytical scans over billions of rows. Row-oriented B+ tree is the wrong layout. Use a columnar store (Pinot, Druid, ClickHouse, BigQuery, Snowflake) for OLAP (OnLine Analytical Processing).
  • Time-series at firehose rates. Append-only timestamp-ordered workloads belong in LSM-based or time-series engines (Prometheus, InfluxDB, M3DB).
  • Global write-throughput at write-master scale. Single-leader OLTP caps around 20–50k commits/sec per primary. Past that, shard (losing cross-shard transactions unless you build 2PC) or move to NewSQL.
  • Eventually-consistent read-heavy workloads. If 10-second staleness is fine, you're paying for guarantees you don't need.
  • Binary blob storage. A 10 MB image in a BLOB column ruins your buffer pool. Use object storage; store the URL.

Mental model: a transactional database is for the small set of mutable, structured, conservation-critical state at the heart of a system. Around it cluster derived stores — caches, search indexes, analytical warehouses, blob stores — that take the truth from the database and reshape it for different access patterns. Get the primary right; everything else can be rebuilt. Get the primary wrong; no amount of caching saves you.


§2. Inherent Guarantees

What the technology provides by design, and what must still be layered above it.

Provided by design

  • Atomicity within a transaction. Statements in a BEGIN ... COMMIT block apply together or not at all. The load-bearing primitive.
  • Isolation at a configurable level. READ UNCOMMITTED → READ COMMITTED → REPEATABLE READ → SERIALIZABLE. MySQL defaults to REPEATABLE READ; PostgreSQL defaults to READ COMMITTED. Know which level you have, what anomalies (dirty read, non-repeatable read, phantom, write skew) each permits, and which your invariants require.
  • Durability after commit ack. With defaults (innodb_flush_log_at_trx_commit=1 in MySQL, synchronous_commit=on in PostgreSQL), once COMMIT returns, the change survives crashes. The mechanism is the WAL (Write-Ahead Log) — §4.
  • Secondary indexes consistent with the base table. Index update is part of the same transaction as the row update.
  • In-database constraintsUNIQUE, FOREIGN KEY, CHECK, NOT NULL — enforced by the database, not the application.

Must be layered above

  • Linearizability across a cluster. Reads against a replica may see a stale snapshot. Use read-from-primary routing for read-after-write, or session-bound consistency tokens (PostgreSQL's pg_current_wal_lsn + replica wait; MySQL GTID-based replica-read).
  • Cross-shard atomicity. Once sharded, the database's transaction boundary stops at the shard. Cross-shard requires 2PC (Two-Phase Commit), sagas, or escrow patterns layered above. NewSQL bakes this in at the cost of commit-wait latency.
  • Idempotency of client requests. The database doesn't know an HTTP retry is "the same" as the original. Use an idempotency_key column with UNIQUE constraint, written inside the same transaction as the mutation (the Stripe pattern).
  • Replication topology choice. Async vs sync vs semi-sync has very different durability characteristics. The database supports all three; picking wrong forfeits durability.
  • Backup and PITR (Point-In-Time Recovery). Many outages are "backups exist, but nobody tested restore."
  • Schema evolution under load. Adding a column to a 1 TB table without locking it for hours is an art (pt-online-schema-change, gh-ost, PostgreSQL CONCURRENTLY).
  • Multi-region disaster recovery. Cross-region async replicas survive regional loss with non-zero RPO (Recovery Point Objective). Cross-region RPO=0 requires NewSQL with consensus or custom synchronous setup (brutal latency tax).

Synthesis: the database guarantees single-node ACID and single-cluster recovery. Everything beyond — cross-shard, cross-region, end-to-end idempotency, schema migration — is the system designer's problem.


§3. The Design Space

Variants of transactional databases differ along several orthogonal axes. They interact; you don't pick freely.

Storage engine: B+ tree vs LSM

The most consequential choice. A B+ tree (InnoDB, PostgreSQL, SQLite, SQL Server) is a balanced tree of fixed-size pages with rows in the leaves sorted by primary key; updates rewrite pages in place; reads traverse log(n) pages. An LSM tree (RocksDB, Cassandra, HBase, ScyllaDB; the engine underneath CockroachDB) buffers writes in an in-memory memtable, flushes to immutable sorted files (SSTables), and compacts files in background; reads consult multiple files via bloom filters.

Tradeoff: B+ trees give predictable low p99 reads and easy range scans at the cost of lower sustained write throughput (~20–30k commits/sec on commodity NVMe) due to random in-place writes. LSM trees give massive write throughput (~100–200k writes/sec/SSD) at the cost of less predictable p99 reads (compaction stalls; multi-SSTable reads). For a payments ledger needing SELECT balance FOR UPDATE to return in <5ms, B+ tree wins. For Discord's Cassandra-based messaging time-series, LSM wins.

Concurrency control: MVCC vs locking

MVCC (Multi-Version Concurrency Control) stores multiple versions of each row keyed by transaction ID; readers never block writers, writers never block readers. PostgreSQL and InnoDB both use MVCC with different implementations (PG stores xmin/xmax on the tuple; InnoDB stores undo log entries). Pure two-phase locking is rare in modern OLTP. MVCC is the modern default because a 30-second analytical scan must not block payment commits. Price: MVCC garbage (§7.5).

Topology: single-leader vs multi-leader vs leaderless

Single-leader is the OLTP default. One node accepts writes; others are read replicas. Simple, leader is the throughput ceiling. Multi-leader (rare in OLTP) allows writes at multiple nodes with treacherous conflict resolution. Leaderless (Cassandra, DynamoDB) lets any node accept writes with tunable quorums; not a fit for OLTP because per-row linearization is hard.

Replication semantics

  • Async: leader commits, ships log later. Best latency, worst durability — lose up to lag-window of acked writes on leader failure.
  • Sync: leader waits for at least one replica to apply before ack. Highest durability, highest tail latency.
  • Semi-sync: leader waits for replica to receive (not apply). Good balance — cross-AZ (Availability Zone) RTT (~1ms) on commit; RPO=0 on common-case failover. MySQL's rpl_semi_sync_master_wait_for_slave_count, PostgreSQL's synchronous_standby_names.
  • Consensus (Raft/Paxos): a quorum persists the log before commit. NewSQL (Spanner, CockroachDB) and consensus DBs (etcd, FoundationDB). RPO=0 with automatic failover; every commit pays one quorum RTT.

Comparison table

Dimension OLTP row store (MySQL/PG) Wide-column LSM (Cassandra) NewSQL (Spanner/Cockroach) Document store (MongoDB)
Storage engine B+ tree LSM tree LSM (RocksDB underneath) B-tree (WiredTiger) or LSM
Concurrency control MVCC + row locks LWT / per-key Paxos MVCC + consensus MVCC
Topology Single-leader Leaderless Consensus per shard Single-leader replica set
Replication Semi-sync configurable Quorum-based Raft/Paxos Replica set with majority
Cross-row txn Native (within shard) None (sagas only) Native (cross-shard) Multi-doc since v4.0
Peak writes/sec/shard 20–30k 100k+ 5–10k (Paxos cost) 10–30k
p99 read predictability High Low (compaction) Medium Medium
Schema enforcement Strong Per-keyspace, weak Strong Optional
Typical use Payments, profiles, inventory Time-series, IoT, fanout Geo-distributed OLTP Per-tenant SaaS docs

The synthesis: each row of "typical use" is a consequence of the column entries. Cassandra ends up at "time-series" because LSM + leaderless + quorum is what time-series wants. MySQL ends up at "payments" because B+ tree + MVCC + semi-sync is what conservation-of-money wants.


§4. Underlying Data Structure: Byte-Level Mechanics

The depth section. We zoom into the storage engine, walk a single mutation byte by byte, and compare B+ tree mechanics to LSM mechanics explicitly. This is what separates "I've used a database" from "I know why this database is right."

4.1 B+ tree page layout

In InnoDB (the canonical B+ tree OLTP engine; PostgreSQL is similar in spirit), every table is stored as a clustered B+ tree keyed on the primary key. The leaves of the primary-key tree contain the row data. Secondary indexes are separate B+ trees whose leaves contain the primary key; non-covered queries pay a second lookup into the clustered tree.

A page is the I/O unit — 16 KB by default in InnoDB, 8 KB in PostgreSQL. Each page is a node of the tree.

Page = 16 KB. Internal layout (simplified, InnoDB):
┌──────────────────────────────────────────────────┐
│ Page header (38 B): page_no, prev/next page,     │
│   level, n_records, checksum, LSN                │
├──────────────────────────────────────────────────┤
│ Infimum + Supremum (system records, 26 B)        │
├──────────────────────────────────────────────────┤
│ User records (sorted by PK), each =              │
│   record header (5 B) + PK + non-PK columns      │
│   + DB_TRX_ID (6 B) + DB_ROLL_PTR (7 B)          │
├──────────────────────────────────────────────────┤
│ Free space                                       │
├──────────────────────────────────────────────────┤
│ Page directory (sparse index of slot offsets)    │
├──────────────────────────────────────────────────┤
│ FIL trailer (8 B): checksum + LSN                │
└──────────────────────────────────────────────────┘

Internal (non-leaf) pages: PK values + child page numbers, sorted.
Leaf pages: PK values + full row data + MVCC fields.
Leaf pages doubly linked (prev, next) → range scans are sequential.

4.2 Branching factor and tree height

With 16 KB pages, a ~16-byte PK, and a 4-byte child pointer, an internal page holds ~800 child pointers. A leaf page holds ~60 rows at 256 B/row.

Worked example. A 70-billion-row table — payments ledger at Alipay scale or a profile-edges table at LinkedIn:

  • Leaves: 70B / 60 ≈ 1.2B leaf pages
  • Level above: 1.2B / 800 ≈ 1.5M pages
  • Level above: 1.5M / 800 ≈ 1,900 pages
  • Level above: 1,900 / 800 ≈ 3 pages
  • Root: 1 page
  • Tree height = 5 levels.

Point lookup traverses 5 pages. With a multi-GB buffer pool, root + upper internal pages are always cached — hot lookup is 1–2 page reads, ~0.1ms p50. Cold lookup misses 2–3 levels, pays ~100µs per SSD random read, lands at ~5–8ms p99.

A smaller use case — a 10M-row e-commerce inventory table at a mid-sized retailer:

  • Leaves: 10M / 60 ≈ 170k
  • Level above: 170k / 800 ≈ 200
  • Root: 1
  • Tree height = 3 levels.

Different scale, same logarithmic ceiling. The branching factor is generous; B+ trees stay short.

4.3 MVCC: how PostgreSQL and InnoDB differ

Both PostgreSQL and InnoDB implement MVCC. They store version information differently:

  • PostgreSQL stores xmin and xmax on every tuple — the inserting and deleting transaction IDs. An update is delete + insert: old tuple gets xmax = T, new tuple gets xmin = T. The result is tuple bloat; dead tuple versions accumulate in the heap until autovacuum runs.
  • InnoDB keeps the current row in the clustered index with two MVCC fields — DB_TRX_ID (6 B) and DB_ROLL_PTR (7 B). The roll_ptr points into a separate undo log tablespace holding old row images. To read an older version, follow roll_ptr through undo records. The purge thread reclaims undo entries no longer needed by any active read view. Cleaner main pages; cost moves to undo tablespace growth.

A reader at transaction T, reading row R updated by transaction T': 1. Inspects R in the clustered index. Reads DB_TRX_ID = T'. 2. Consults its read view — the set of transactions visible at T's snapshot time. 3. If T' committed before T's snapshot, T sees this version. 4. Otherwise: follow DB_ROLL_PTR into the undo log to fetch the previous version. Recurse.

This is what makes MVCC powerful: a 30-second analytical scan can run on a hot replica while writes continue at 10k/sec on the primary; the scan sees a frozen snapshot materialized via undo chains. The price is the long-running-reader problem (§7.5).

4.4 Buffer pool: the LRU you actually need

The buffer pool is the database's in-memory page cache, sized to 60–80% of RAM. A naive LRU (Least Recently Used) has a known failure: one large scan (SELECT * FROM events WHERE ts > ...) reads millions of pages, each touched once, evicting the hot working set.

InnoDB defends with a mid-point insertion LRU:

buffer pool LRU list:
  ┌──────── young (5/8) ────────┬──── old (3/8) ────┐
  HEAD                          midpoint            TAIL (evict)
  ↑                             ↑
  promoted here on              new pages enter here.
  second access after           Must survive innodb_old_blocks_time
  the dwell time                ms (default 1000ms) to promote.

A newly read page enters at the midpoint. If touched again after a dwell time, it's promoted to the young head. A one-shot scan reads pages into the old region; they get evicted before they pollute the young set. PostgreSQL's buffer manager uses a clock-sweep variant of the same idea.

Capacity-planning consequence: you don't need to fit the whole dataset in RAM. You need to fit the hot working set. A 17 TB shard with a 180 GB buffer pool is fine if 1% of accounts drive 99% of traffic.

4.5 WAL / redo log: the durability primitive

The Write-Ahead Log (WAL, or "redo log" in InnoDB) makes the buffer pool safe. Invariant: every page modification is described in the WAL, and the WAL is on disk before the modification's commit is acked. Page writes themselves can lag commit by minutes; the WAL is the recovery oracle.

The WAL is a sequential append-only stream of physical-logical records: "at LSN (Log Sequence Number) L, page P, offset O, write bytes B". Sequentiality is the trick — a single SSD doing random page writes might do 10k IOPS, but sequential WAL appends sustain hundreds of MB/sec.

Commit protocol (MySQL default, innodb_flush_log_at_trx_commit=1):

  1. Transaction modifies pages in the buffer pool; pages marked dirty.
  2. Each modification appends a redo record into the in-memory log buffer.
  3. On COMMIT, flush log buffer via write() + fsync().
  4. Only after fsync returns is the commit acked.
  5. Dirty pages stay in buffer pool for minutes before being written to actual locations.
  6. Periodic checkpoints advance the "redo from here" pointer, allowing WAL ring reuse.

Group commit is essential. A single NVMe fsync is ~50–200µs. At 18k commits/sec naive per-commit fsync saturates the SSD. InnoDB batches concurrent commits into one fsync — 100 transactions ride one fsync, effective rates well past 20k/sec. PostgreSQL implements the same idea (commit_delay, wal_writer_delay).

4.6 One concrete operation, byte by byte

Walk a single UPDATE end to end. We'll use a debit on a payments ledger as the example — UPDATE accounts SET balance = balance - 100 WHERE account_id = 42 — but the same machinery runs an inventory decrement, a profile-bio update, or a feature-flag rule change. Let the LSN before the operation be 1000.

  1. Client → DB shard owning row 42 receives BEGIN; SELECT balance FOR UPDATE; UPDATE; COMMIT;.
  2. Lock acquisition: row lock on PK=42 in the clustered index. In-memory lock manager. No disk I/O.
  3. Read existing row: B+ tree traversal. Root → level-3 → level-2 → level-1 → leaf. If leaf is hot, 0 disk I/O. If cold, 1 SSD read ~100µs. Current row: balance=500, DB_TRX_ID=T_prior, DB_ROLL_PTR=undo_ptr_5.
  4. Compute new value: balance = 400.
  5. Write to undo log first: new undo record at undo_ptr_6 with the old row image. Undo pages also in the buffer pool. A redo log record is appended in memory describing this undo write.
  6. Mutate the leaf page in buffer pool: write balance=400, DB_TRX_ID=T_current, DB_ROLL_PTR=undo_ptr_6. Leaf page now dirty. Append redo record: LSN=1042: physical write to page P_leaf, offset O, bytes <new row>.
  7. Secondary indexes: if there's a (account_id, ts DESC) index and ts is unchanged on a balance update, the index is untouched. If the indexed columns change, more pages dirty, more redo records.
  8. Auxiliary inserts in the same transaction (journal entry, audit row): each is another B+ tree insertion, possibly triggering a page split (target leaf full → allocate new page, redistribute; InnoDB's right-split heuristic keeps left pages full for monotonic inserts). Page split logged with another redo record.
  9. COMMIT: write a "transaction commit" redo record at LSN 1080. Flush log buffer to OS. fsync(redo_log_fd). ~150µs on NVMe.
  10. Semi-sync wait (if configured): binlog event sent to replica over TCP. Replica acks receipt. ~500µs–1ms RTT cross-AZ.
  11. Acknowledge client: HTTP 200 with new value.
  12. Asynchronously, seconds to minutes later: page cleaner thread flushes the dirty leaf page to disk. A checkpoint eventually advances the LSN floor past 1080. Until that checkpoint, the redo log entries from this commit are required for recovery.

Crash scenarios:

  • Crash between step 9 (fsync done) and step 11 (ack sent): client retries with the same idempotency_key. The idempotency row was inserted in the same transaction; the lookup finds it; we return the prior result. Durability point: the fsync at step 9.
  • Crash between step 6 (page dirtied) and step 9 (commit logged): redo log has no COMMIT record. Restart scans redo log from last checkpoint, finds partial transaction, rolls back via undo. Lock released. Client times out, retries — fresh attempt. Correct: the partial state never escaped.
  • Crash between step 11 (ack) and step 12 (page flushed): redo log has the commit. Restart scans forward from last checkpoint, finds COMMIT at LSN 1080, redoes the page mutation. Dirty page reconstructed in buffer pool.

4.7 B+ tree vs LSM, explicit comparison

Property B+ tree (InnoDB) LSM tree (RocksDB)
Write path random page write + redo (sequential) sequential WAL + memtable, periodic flush
Write amplification ~3–5x (page + index + redo + binlog) ~10–30x (compaction rewrites)
Read amplification 1–5 page reads (tree height) 1 memtable + N SSTables (bloom-filtered)
Space amplification ~1.5x (fill factor ~70%) ~1.1–2.2x
Sustained writes/sec/SSD ~20–30k small commits ~100–200k
p99 read low, predictable higher, less predictable (compaction stalls)
Range scan excellent (linked leaves) good (merging iterators; tombstones add cost)
Row-level locking native typically optimistic / version-based

Why B+ tree for transactional workloads. Predictable p99 reads matter more than peak write throughput when your read path is "balance check before debit," "stock check before order," "ACL check before serve." Row-level locking with FOR UPDATE is how you serialize concurrent mutations without inventing CAS atop the engine. Accept ~20–30k commits/sec/primary because the workload shards cleanly.

Why LSM for high-write workloads. When the pattern is "append events keyed by partition, occasionally point-lookup by partition+timestamp," LSM is ~5x cheaper at the write path and compaction stalls don't show up because reads are bulk-scan, not user-facing. Cassandra and HBase dominate time-series and IoT; modern NewSQL systems use RocksDB underneath while building B+ tree-like transactional semantics above it.


§5. Capacity Envelope

What this technology can do, illustrated across very different scales.

Single-node OLTP — startup scale. A PostgreSQL on EC2 r6i.4xlarge (16 vCPU, 128 GB RAM, NVMe local SSD) sustains ~10k commits/sec at p99 <10ms, ~50k point reads/sec from a hot buffer pool, ~1–10 TB working data. Plenty for a SaaS with 10k customers at 100 QPS each. Hacker News ran on one PostgreSQL until ~2016; Basecamp on one MySQL primary for years. Next bottleneck: write throughput on one primary. Adding read replicas scales reads, not writes. At ~30k commits/sec, sharding looks unavoidable.

Sharded relational — mid scale. Application-level sharding (Vitess at YouTube and Slack; ProxySQL; or hand-rolled). Notion sharded PostgreSQL ~2021 into ~32 logical partitions — ~5k writes/sec/shard, ~50k total. Stripe used MongoDB historically, migrating to sharded PostgreSQL — estimated 5–10k payments/sec peak (2024). Pinterest stores pins in 4096 logical MySQL shards across hundreds of physical instances; 600k pin-writes/sec during traffic peaks. Bottleneck: cross-shard transactions. Once you need "transfer money," "move a document between workspaces," "join two tenants' data," the application builds 2PC or sagas.

Vitess at YouTube — high scale. Sharding layer over MySQL for video metadata. Millions of QPS aggregate across thousands of shards; PB-class total data; p99 read on a hot shard <10ms. Vitess solved the shard-router problem operationally while leaving the storage engine as MySQL InnoDB.

OceanBase / Alipay — payments at firehose rate. MySQL-compatible distributed SQL. 583,000 payments/sec peak on Singles' Day 2020; ~150 billion txns/day; ~50 PB total ledger; p99 commit <100ms including cross-shard 2PC; RPO=0; RTO <30s for single-AZ failure. Upper bound currently shipping in production. Architecture: sharded MySQL-compatible engine + Paxos per shard + custom 2PC coordinator.

LinkedIn Espresso — profile data at PB scale. Distributed store for profiles, posts, comments, connections. PB-scale; ~10M QPS aggregate across thousands of shards. Per-shard MySQL with custom routing (Helix) and CDC (Change Data Capture) via Brooklin → Kafka → derived stores.

The range — startup 10k QPS, mid-scale 50–500k, OceanBase hundreds of thousands with cross-shard transactions — is two orders of magnitude wide for the same technology class. What scales is the partitioning and replication topology above the storage engine; the storage engine itself (B+ tree, MVCC, WAL) is the same on a Notion shard as on an OceanBase shard.


§6. Architecture in Context

The canonical pattern for a transactional database in a production system. Not "the X system" — the shape that recurs across payments, profiles, inventory, leaderboards, SaaS documents, feature flags.

                              ┌──────────────────────────────┐
                              │   Application servers        │
                              │   (stateless, horizontal)    │
                              └──────────┬───────────────────┘
                                         │  idempotency_key on writes
                                         │  partition_key chosen by app
                                         ▼
                              ┌──────────────────────────────┐
                              │   Shard router / proxy       │
                              │   (Vitess, ProxySQL, custom) │
                              │   hash(partition_key) → shard│
                              └──────────┬───────────────────┘
                                         │
            ┌──────────┬──────────┬──────┴───────┬──────────────┐
            ▼          ▼          ▼              ▼              ▼
       ┌────────┐ ┌────────┐ ┌────────┐    ┌────────┐    ┌────────┐
       │ Shard  │ │ Shard  │ │ Shard  │ …  │ Shard  │    │ Shard  │
       │   0    │ │   1    │ │   2    │    │  N-2   │    │  N-1   │
       │ Primary│ │ Primary│ │ Primary│    │ Primary│    │ Primary│
       │   │    │ │   │    │ │   │    │    │   │    │    │   │    │
       │ semi-sync replicas (1–2 cross-AZ)                       │
       │   │    │ │   │    │ │   │    │    │   │    │    │   │    │
       │ Replica│ │ Replica│ │ Replica│    │ Replica│    │ Replica│
       └───┬────┘ └───┬────┘ └───┬────┘    └───┬────┘    └───┬────┘
           │ binlog / WAL stream │                           │
           └──────────┬──────────┴──────────┬────────────────┘
                      ▼                     ▼
            ┌────────────────────┐ ┌────────────────────────┐
            │  Cache tier        │ │  CDC (Debezium /       │
            │  (Redis/Memcached) │ │   Brooklin) → Kafka    │
            │  hot reads         │ │   feeds derived stores │
            └────────────────────┘ └─────────┬──────────────┘
                                             │
                            ┌────────────────┼────────────────┬──────────────┐
                            ▼                ▼                ▼              ▼
                       ┌─────────┐      ┌─────────┐      ┌─────────┐    ┌─────────┐
                       │ Search  │      │ OLAP /  │      │ Feature │    │ Object  │
                       │ index   │      │ column  │      │ store   │    │ archive │
                       │ (ES,    │      │ store   │      │ (Redis +│    │ (S3,    │
                       │  Solr)  │      │ (Pinot, │      │  Iceberg│    │  Parquet│
                       │         │      │  Druid) │      │  S3)    │    │  cold)  │
                       └─────────┘      └─────────┘      └─────────┘    └─────────┘

The transactional database (left half) holds the source of truth. Around it cluster:

  • A cache tier (Redis, Memcached) for hot reads — feed serving, profile rendering, ACL checks. Cache invalidation is the application's problem.
  • A CDC pipeline (Debezium reading PostgreSQL logical replication, Brooklin reading MySQL binlog) publishing per-row change events to Kafka.
  • Derived stores: search indexes, OLAP column stores, ML feature stores, object archives. Each rebuildable from WAL/binlog history; each at bounded staleness (~5–10s typical); none is source of truth.

Annotations: sharding at the router by hashing the partition key (account_id, user_id, tenant_id, pin_id). Replication within a shard group — primary fans out to semi-sync replicas in other AZs. CDC at the binlog/WAL layer, downstream of replication. Cache invalidation by row-level key, application-coordinated.

This shape is the same whether the application is payments, social feed metadata, inventory, or SaaS documents. The variant (MySQL vs PostgreSQL vs MongoDB) and partitioning strategy changes by workload; the layout doesn't.


§7. Hard Problems Inherent to Transactional Databases

Six fundamental challenges. Each shows up regardless of use case; illustrating examples come from across domains.

7.1 Hot row contention

One line. One row receives so many writes that the row lock serializes them and the queue grows unboundedly.

Where it shows up. A Twitch streamer in a tipping system receives 100k tips/sec. A flash-sale SKU receives 50k decrements/sec at launch. A trending hashtag's counter increments at hundreds of thousands per second.

Why it breaks. A single InnoDB row lock supports ~1–5k updates/sec. At 100k/sec the lock queue grows; p99 explodes from 5ms to seconds; unrelated queries on the same shard suffer because the I/O thread is saturated.

Real fix: write-sharding within a row. Promote the hot row to N=64 sub-rows. Inserts route by event_id % 64. Reads aggregate via SUM. Pattern appears in tip-aggregation at livestreaming platforms, counter aggregation at social feed services, escrow patterns in DDIA. Withdrawals or strong-consistency reads coordinate cross-sub-row, accepting a 2PC tax in exchange for unblocked writes.

Inflection. Auto-promote when a row's write rate exceeds 1k/sec sustained for 60s.

7.2 Cross-shard transactions

One line. Once sharded, a transaction touching two shards has no native atomicity primitive.

Where it shows up. A payment transfers money between accounts on different shards. A workspace move in Notion relocates pages from one tenant shard to another. A friend request in a social graph touches both users' adjacency lists.

Why it breaks. Concrete state (payments): t=0 Alice on shard 17 balance 1000, Bob on shard 203 balance 500. t=1 debit Alice → 900, commit shard 17. t=2 network blip, credit Bob on shard 203 times out. t=3 refund Alice, also times out. t=4 $100 gone. Conservation violated. Same pattern in inventory: decrement warehouse A, fail to increment warehouse B, stock lost.

Real fix: 2PC with a durable coordinator on top of per-shard XA transactions, or sagas with compensation if loose ordering is acceptable.

   Coordinator                Shard 17                       Shard 203
   ───────────                ─────────                      ─────────
   1. log: PREPARING(T)
   2. ─── PREPARE ──────►     XA PREPARE: debit Alice
                              (row locked, redo+binlog fsync'd)
   3. ─── PREPARE ────────────────────────────────────►     XA PREPARE: credit Bob
                                                            (row locked, fsync'd)
   4. ◄── YES ──                                            ◄── YES ──
   5. log: COMMITTED(T)   ← the durability point
   6. ─── COMMIT ──────►     XA COMMIT
   7. ─── COMMIT ─────────────────────────────────────►     XA COMMIT

The coordinator's COMMITTED log entry guarantees both participants commit even through restarts. Between PREPARE and COMMIT, both rows are locked (~16ms tail). Technically "2PC with participant timeouts" — participants abort unilaterally if the coordinator goes silent past a timeout. NewSQL bakes this in via Paxos-replicated transaction records at the cost of consensus-RTT commit latency.

7.3 Replication lag and read-after-write

One line. A replica trails the primary by 50ms–2s; reading immediately after a write may see the old state.

Where it shows up. A user updates their LinkedIn bio, refreshes their profile, sees the old bio (read hit a replica). A merchant updates pricing, refetches the product list, sees old prices. A user posts a Notion comment, reloads the page, comment missing.

Why it breaks. At 50ms lag, ~1% of read-after-write hits a lagging replica. At 10k QPS that's 100 inconsistent reads/sec — visible in user complaints.

Real fix. Route read-after-write to the primary for ~5s after a user's write, tracked by session cookie. Or use session-bound consistency tokens — PostgreSQL returns pg_current_wal_lsn after a commit; a subsequent read waits for any replica to catch up (pg_wal_replay_wait_for_lsn). MySQL has similar via GTIDs (Global Transaction Identifiers). For invariant reads (balances), always read from primary; trade read scalability for correctness.

7.4 Schema evolution under load

One line. Adding a column, changing a type, adding an index — naively as table-locking DDL — blocks production traffic.

Where it shows up. A new feature adds last_login_at to a 500M-row users table at LinkedIn. A regulatory requirement adds kyc_status to a 10B-row accounts table. A redesign at Notion changes a column type from INT to BIGINT on a 5B-row blocks table.

Why it breaks. Vanilla ALTER TABLE in MySQL historically rewrote the table holding a metadata lock — hours of downtime on 500M rows.

Real fix: online schema change tools. MySQL: pt-online-schema-change or gh-ost. Both create a shadow table, dual-write via triggers (pt-osc) or binlog tailing (gh-ost), backfill, atomically swap. PostgreSQL: CREATE INDEX CONCURRENTLY; ADD COLUMN ... DEFAULT NULL (metadata-only since PG 11); pg_repack for rewrites. Type changes: dual-write to old and new columns, backfill, switch reads, drop old — multi-week dance.

7.5 Long-running transactions and MVCC bloat

One line. A long BEGIN holds an MVCC snapshot, blocking garbage collection of dead row versions across the whole database.

Where it shows up. An analyst leaves psql open with BEGIN; SELECT ... for an hour. A batch ETL runs a 30-minute scan inside a transaction. A stuck connection sits in idle in transaction after a crash.

Why it breaks. PostgreSQL: T starts at LSN 1000; 10M rows are updated while T runs; each update marks old tuples with xmax; autovacuum cannot reclaim because they're still visible to T's snapshot; table bloats 10x; eventually disk fills. InnoDB variant: long readers hold back undo log purge; undo tablespace grows unboundedly.

Real fix. idle_in_transaction_session_timeout, statement_timeout set to a few minutes; force-terminate longer transactions. Application: explicit query timeouts; share snapshots across short transactions rather than holding one big one. Monitor pg_stat_activity for old transactions and alert. Silent killer of production OLTP.

7.6 Hot partition under uneven traffic

One line. Even with good sharding, traffic is rarely uniform; one shard runs hot while others idle.

Where it shows up. An e-commerce platform sharded by customer_id finds one marketplace seller is 50x median activity. A leaderboard sharded by region finds one region drives 80% of QPS. A multi-tenant SaaS sharded by tenant_id finds the largest tenant uses 20% of total capacity.

Why it breaks. Hash sharding spreads keys evenly but does not spread traffic. If one key gets 1000x the traffic, its shard is 1000x hotter. Uniform capacity expansion leaves the hot shard under-provisioned and others over-provisioned.

Real fix: tiered handling. For very hot keys: write-shard within the key (§7.1). For moderate skew: rebalance by moving partitions, not by hashing alone (Vitess's VReplication moves a key range to a different shard). For seasonal hotness: oversize the suspected hot shard ahead of peak — Singles' Day at Alipay, Black Friday at Amazon. Capacity planning is per-shard, not uniform.


§8. Replication, Failover, and Disaster Recovery

The previous sections treated replication as a knob (async / semi-sync / sync). At production scale, replication is the operational story — how data moves across nodes, how a primary loss becomes a 30-second blip instead of a 6-hour outage, how a "drop table production" is recoverable instead of a résumé-generating event. This section opens the box.

8.1 Binlog / WAL streaming mechanics — how replication actually moves data

Replication is one node tailing another node's durability log. The mechanics differ between MySQL and PostgreSQL in shape but converge on the same idea: ship the log, replay it.

MySQL binlog. The binlog (binary log) is a separate log from the InnoDB redo log. Where redo is physical (page-level), binlog is logical (event-level) and is what replicas consume. Three formats:

  • Statement-based replication (SBR). The SQL statement is shipped (UPDATE accounts SET balance = balance - 100 WHERE account_id = 42). Compact. Breaks on non-deterministic functions (NOW(), RAND(), UUID()), on row-count-dependent statements, on triggers with side effects. Mostly deprecated.
  • Row-based replication (RBR). The before and after row images are shipped. Deterministic, safe under any statement. Larger payload — a single UPDATE ... WHERE status='pending' touching 100k rows ships 100k row events.
  • Mixed. Statement by default; auto-switch to row when the statement is non-deterministic. Default since MySQL 5.7.

GTID (Global Transaction Identifier). Each committed transaction gets a unique ID of the form server_uuid:transaction_id. Replicas track which GTIDs they have applied (gtid_executed). Failover becomes trivial: "give me everything after GTID X:1042." Before GTIDs, replicas tracked (binlog_file, binlog_offset) — which broke catastrophically when the primary changed (the new primary's binlog files are not the old primary's binlog files). GTIDs decouple the identifier from the physical file.

The replication threads.

   Primary                            Replica
   ───────                            ───────
   Client commit
       │
       ▼
   InnoDB redo log fsync
       │
       ▼
   Binlog event append + fsync (sync_binlog=1)
       │
       ▼
   Dump thread (per replica)
       │ ── TCP ─────────────────────► IO thread
                                           │
                                           ▼
                                       Relay log (on disk, replica's local)
                                           │
                                           ▼
                                       SQL thread (or parallel workers)
                                           │
                                           ▼
                                       Apply to InnoDB → local commit
                                           │
                                           ▼
                                       Update gtid_executed

The dump thread on the primary is dedicated per replica — one TCP connection, streaming binlog events in commit order. The IO thread on the replica receives events and appends to the relay log (a local on-replica copy of the binlog segment). The SQL thread (legacy) or parallel workers (5.7+, slave_parallel_workers=N) read from the relay log and apply events to InnoDB. The split is deliberate: receiving is decoupled from applying so a slow apply doesn't backpressure the network.

PostgreSQL WAL streaming. PostgreSQL doesn't have a separate binlog — the redo log is the replication log. Streaming replication ships raw WAL records.

   Primary                            Replica
   ───────                            ───────
   Client commit
       │
       ▼
   WAL record append + fsync (synchronous_commit=on)
       │
       ▼
   walsender (per replica connection)
       │ ── TCP ─────────────────────► walreceiver
                                           │
                                           ▼
                                       Local WAL append + fsync
                                           │
                                           ▼
                                       Startup process (recovery mode)
                                       redoes WAL records onto local data files
                                           │
                                           ▼
                                       Update pg_last_wal_replay_lsn

Replication slots track which WAL segments are still needed by each replica. Without a slot, an aggressive checkpoint could recycle WAL segments before a lagging replica consumed them, breaking the replica. With a slot, the primary refuses to recycle WAL the replica still needs — at the cost that a permanently-disconnected replica's slot will fill the primary's disk (a recurring outage pattern; monitor pg_replication_slots.confirmed_flush_lsn).

LSN (Log Sequence Number). Both MySQL and PostgreSQL use a monotonic position into the log. PostgreSQL's LSN is a 64-bit byte offset (pg_current_wal_lsn() → 0/3A12F8). MySQL's binlog has (file, position) pre-GTID and GTID sets post-GTID. Lag is primary_lsn − replica_lsn in bytes, or seconds derived from "when was this LSN committed on primary."

One transaction byte-by-byte, primary → replica. Continuing the §4.6 example (debit Alice -100 at LSN 1042):

  1. Primary writes redo record at LSN 1042 (page mutation) and binlog event at GTID srv-uuid:1042 containing the row before/after image.
  2. After local commit, primary's dump thread (or walsender for PG) reads the new binlog/WAL bytes and writes them to the TCP socket.
  3. The replica's IO thread (or walreceiver) receives the bytes. Replica writes them to its relay log (or local WAL) and fsyncs.
  4. In semi-sync mode, the replica acks at this point. Primary's commit returns to the client.
  5. The replica's SQL thread (or startup process) reads from the relay log, applies the row write to the InnoDB clustered index (or the WAL records to the PG heap pages). Replica commits locally. Replica's gtid_executed (or pg_last_wal_replay_lsn) advances to 1042.
  6. The replica is now caught up. A read from this replica with snapshot ≥ 1042 sees the debit.

The "lag" we obsess about is the time between step 1 and step 5 on a per-replica basis. Step 4 is what semi-sync waits for; step 5 is what sync_commit=remote_apply waits for.

8.2 Replication modes deep dive

The four modes and what they actually cost.

Async (default). Primary acks the client as soon as its local redo+binlog fsync returns. Replicas catch up at their own pace. Minimal commit latency (~150–300µs on NVMe). RPO > 0 on primary loss — anything not yet shipped to a replica is lost.

Client    Primary                  Replica
   │        │                        │
   │ COMMIT │                        │
   ├───────►│                        │
   │        │ fsync redo+binlog      │
   │        │                        │
   │        │ ──── async send ──────►│   (some time later)
   │ ◄─ACK──┤                        │
   │        │                        │
            │ (replica may be 50ms,  │
            │  500ms, or 30s behind) │

Semi-sync (rpl_semi_sync_master_wait_for_slave_count=1 in MySQL, synchronous_commit=remote_write in PG). Primary waits for the replica to acknowledge receipt (durable in replica's relay log / replica's WAL) before acking the client. Replica has not necessarily applied — but it's on durable storage. Crash of primary doesn't lose the commit. RPO ≈ 0 in the common case.

Client    Primary                  Replica
   │        │                        │
   │ COMMIT │                        │
   ├───────►│                        │
   │        │ fsync redo+binlog      │
   │        │ ── sync send ─────────►│
   │        │                        │ fsync relay log / WAL
   │        │ ◄────── ACK ───────────┤
   │ ◄─ACK──┤                        │

Latency tax: one cross-AZ RTT, ~0.5–2ms typical. Cross-region: 50–200ms — usually unacceptable for OLTP.

Sync / remote_apply (synchronous_commit=remote_apply in PG). Primary waits for the replica to apply the change before acking. The strongest guarantee: a read on the named synchronous replica immediately after commit is guaranteed to see it. Brutal latency: 2× WAL fsync + apply time + RTT. Cross-AZ commits go from ~1ms to ~3–5ms; tail latencies pile up. Rare in production OLTP because the apply step is unpredictable.

Consensus-based (Raft/Paxos per shard). Spanner, CockroachDB, TiDB, OceanBase, FoundationDB. Each commit must persist on a majority of the replica set before acking. With 3 replicas, 2-of-3 must fsync — the commit survives any single failure including the primary's. Latency tax: one quorum RTT — ~5–15ms intra-region, 50–200ms cross-region.

   Leader                                  Followers
   ──────                                  ─────────
   commit T → Raft AppendEntries(T)
       │ ── send to follower A ──────────► fsync, ack
       │ ── send to follower B ──────────► fsync, ack
       │
       │   wait for majority (2 of 3 incl self) — ackd
       │
       ▼
   Apply to state machine, ack client

Concrete RTT numbers.

Mode Same AZ Cross-AZ (same region) Cross-region (US-East ↔ US-West) Cross-continent
Local NVMe fsync ~150µs ~150µs ~150µs ~150µs
Async replication +0 (client) +0 (client) +0 (client) +0 (client)
Semi-sync (remote_write) +0.2ms +0.5–1.5ms +50–80ms +150–250ms
Sync (remote_apply) +0.5–2ms +1.5–4ms +60–100ms +180–300ms
Raft quorum (3 replicas) +0.3–1ms +1–3ms +50–80ms (worst pair) +150–250ms

The "wait for slave" knob when no replica is reachable. Semi-sync turns into a CP/AP choice. MySQL's rpl_semi_sync_master_timeout (default 10000ms = 10s): if no replica acks within timeout, the primary falls back to async — silently. AP behavior: writes proceed, durability degrades. To get CP behavior (block writes until a replica acks), set timeout very high or pair with semi-sync orchestration that quiesces the primary. PostgreSQL: if synchronous_standby_names is set and no named standby is available, writes block indefinitely — pure CP. Operators set up a "phantom standby" entry (ANY 1 (replica_a, replica_b)) so that loss of any one replica doesn't quiesce writes.

8.3 Replication topology variants

Primary → N replicas (star). The default. One primary streams to each replica directly. Each replica costs one TCP connection, one dump/walsender thread, replication-bandwidth equal to the write rate. Past 5–8 replicas, the primary's network and CPU start to feel it.

            ┌─► Replica A (same AZ, semi-sync)
   Primary ─┼─► Replica B (other AZ, semi-sync)
            ├─► Replica C (other AZ, async, for reads)
            └─► Replica D (other region, async, for DR)

Primary → relay → replicas (cascade). When fanout exceeds ~8 replicas, introduce an intermediate. The relay node is itself a replica; it re-streams to a downstream cohort. Pinterest historically used 1 primary → 1 intermediate → 6 replicas per shard so the primary served 1 dump thread instead of 6. Adds one hop of lag (~30–100ms).

   Primary ─► Intermediate Replay Replica ─┬─► Replica 1
                                           ├─► Replica 2
                                           ├─► Replica 3
                                           └─► … Replica N

Multi-primary (MySQL Group Replication, BDR for PostgreSQL). Two or more nodes accept writes; conflicts resolved by certification (Group Replication uses an in-memory total order based on Galera-style certification). Rare in OLTP because conflict resolution is treacherous — writing to the same row in two regions and reconciling by timestamp is silent corruption for "subtract 100 from balance" semantics. Multi-primary works when the partition discipline ensures no key is written in two places (geo-partitioning), which is just sharding with a different label.

Read replicas in different AZs/regions. A common 3-AZ setup: primary + 1 semi-sync replica in another AZ (RPO=0), + 2 async replicas split across AZs for read load + 1 cross-region async replica for DR.

Per-replica cost. Each replica costs ~1 commit cycle of CPU on the primary (the dump/walsender thread doing the network send) + network bandwidth equal to the write rate per replica + storage equal to the dataset. Eight replicas = 8x replication bandwidth. A 17 TB shard with 50 MB/s of write traffic = 400 MB/s outbound from the primary across all replicas — that's 3.2 Gbps. AWS r6i.4xlarge has 12.5 Gbps; comfortable. r6i.large at 0.75 Gbps would be saturated.

8.4 Replication lag — causes, monitoring, mitigation

Lag is the operational metric. At 0ms lag, reads from replicas are consistent. At 30s lag, your bug tracker is full.

Causes.

  • (a) Network bandwidth saturated by large transactions. A 100k-row UPDATE products SET price = price * 1.05 generates ~50 MB of binlog row events in row-based format; ship over a 100 Mbps link and you have 4 seconds of replication lag just from the network. Mitigation: rate-limit large batch jobs, run them off-hours, or break into smaller transactions.
  • (b) Single-threaded SQL apply on replica (legacy MySQL pre-5.7). Before 5.7, the SQL thread was a single goroutine applying events in serial. A primary with 8 cores merrily committing in parallel sends events that the replica must apply sequentially. Replicas could not keep up with bursty primaries — this caused the famous 2010s "MySQL replication lag" incidents at Twitter, Facebook, et al.
  • (c) Replica I/O subsystem slower than primary. Smaller-instance replica (cost optimization) with slower SSDs; replica fsyncs at 100µs while primary fsyncs at 50µs. Once the replica is behind, it never catches up because the steady-state apply rate is below the primary's commit rate.
  • (d) Long-running queries on replica holding MVCC snapshot. An analytical query with a 30-minute snapshot prevents the replica from applying any vacuum or undo cleanup; eventually, replication threads slow down. PostgreSQL's hot_standby_feedback=on makes this worse — the replica's snapshot pins the primary's vacuum too. (Off-loading analytics to replicas is a feature; running 4-hour scans on the primary's hot-standby replica is malpractice.)

Monitoring.

  • MySQL SHOW SLAVE STATUS\GSeconds_Behind_Master. Famously unreliable. It measures the timestamp delta between the currently applied event and the current wall clock on the replica. If the dump thread stalls — IO thread idle, SQL thread idle — Seconds_Behind_Master returns 0 even though replication is broken. Use Performance Schema replication_applier_status_by_worker.LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP for real numbers.
  • PostgreSQL pg_stat_replicationsent_lsn, write_lsn, flush_lsn, replay_lsn. Replay lag in time: now() - pg_last_xact_replay_timestamp() on the replica.
  • Lag in bytes (primary_lsn − replica_replay_lsn) is more honest than lag in seconds because it doesn't lie when nothing's coming through.
  • Alert thresholds. 5s is the common "yellow" threshold; 30s is "red". 0.5s lag is normal for cross-AZ async. Anything > 60s is an incident.

Mitigations.

  • MySQL 5.7+ multi-threaded slave: slave_parallel_workers=8, slave_parallel_type=LOGICAL_CLOCK. Replica applies events in parallel based on commit groups from the primary — events that committed in the same group commit on the primary can replay in parallel on the replica.
  • PostgreSQL 10+ parallel apply via logical replication. Streaming replication is still single-threaded for physical WAL; logical replication has parallel workers.
  • Batch large transactions into smaller chunks. A 1M-row update split into 100x 10k-row chunks gives replication chances to keep pace.
  • Prioritize replication network. QoS on AWS Direct Connect or VPC; isolate replication traffic from application traffic.
  • Right-size replicas equal to or better than the primary. "Saving money" on a replica that has to keep up at primary's write rate is false economy.

Concrete walkthrough of a 30-minute replication lag incident and rolling recovery.

t=0:    Daily analytics job kicks off on primary's reporting replica.
        Query: UPDATE billing_summary SET rollup = (...) WHERE month = '2026-04'
        Touches 50M rows; held in one transaction.
t=2min: Replica's WAL apply slows — the long-held snapshot blocks vacuum,
        and the bulk update generates 8 GB of binlog/WAL events.
        Replication lag begins growing at ~2s per real second.
t=10min: Lag = 16 minutes. Read-after-write breaks for any user whose request
        hit this replica. Customer-facing impact: stale data in dashboards
        showing yesterday's number.
t=12min: Pager fires (replica_lag > 5min for >2min).
        On-call inspects: pg_stat_replication shows replay_lsn frozen,
        pg_stat_activity shows a 12-min-old query on the replica.
t=15min: Cancel the long query on the replica (pg_cancel_backend(pid)).
        Replication resumes. Lag plateaus at ~17min.
t=15min: Reroute read-after-write traffic to the primary (feature flag).
        Customer impact lifted; latency on primary rises 20% but tolerable.
t=15min – t=45min: Replica catches up at ~30s per real second 
        (apply rate > network rate while behind; warm caches help).
        Lag drains.
t=45min: Lag = 0. Reroute read-after-write back to replica.
t=46min: Post-incident: move the analytics job to a dedicated reporting replica
        with hot_standby_feedback=off and statement_timeout=10min.
        Add monitoring on long-running transactions on replicas.

The pattern matters: lag is rarely the network. It's almost always a bulk operation on the replica or a single-threaded apply behind a write-bursty primary.

8.5 Failover orchestration

How automatic primary promotion actually happens. Each tool implements the same essential flow with different operational tradeoffs.

Orchestrator (GitHub's MySQL topology manager). Runs as its own HA cluster (etcd/Raft-backed for itself). Polls every MySQL node every few seconds. On detecting primary unreachable for 3+ consecutive failed health checks (~30s), it runs:

   1. Detection: 3 consecutive health-check failures over 30s.
   2. Election: query all replicas for their GTID_EXECUTED.
                 Pick the one with the largest GTID (= most data).
                 Tiebreak by lag / topology preferences.
   3. Fencing: mark old primary as read-only via STONITH or 
               network fence (if reachable); reject its writes.
   4. Promotion: SET GLOBAL read_only=OFF on chosen replica.
                 Re-point all other replicas: STOP SLAVE / CHANGE MASTER TO 
                 the new primary.
   5. Service discovery: update DNS / VIP / Vitess routing layer.
   6. Re-attach old primary as a replica once recovered.

Wall-clock end-to-end: ~20–60s. The fencing step is the load-bearing one — see §8.6.

Patroni (PostgreSQL). Uses a DCS (Distributed Configuration Store) — etcd, Consul, or ZooKeeper — for leader election via the DCS's own consensus. Each Patroni-managed PG node owns a lease in the DCS; the leader's lease is the source of truth for "who is primary." On lease expiry (typically 30s without renewal), other nodes race to claim the lease; the one with the highest LSN wins.

   1. Old primary's lease in etcd expires (failed to renew for 30s).
   2. All replicas observe lease expiry, query the DCS.
   3. Each replica reports its pg_last_wal_replay_lsn.
   4. Highest-LSN replica claims the leader key in etcd 
       (atomic compare-and-swap).
   5. Patroni on the winner runs pg_ctl promote.
   6. Other replicas re-point: pg_rewind or fresh basebackup,
       then standby.signal + primary_conninfo pointing at new primary.
   7. HAProxy / PgBouncer routes connections to new primary via 
       Patroni's REST API (/master endpoint).

AWS RDS Multi-AZ. A synchronous standby in another AZ. On primary failure, AWS swaps the DNS CNAME from primary to standby (~30–60s end-to-end). Transparent to applications using the RDS endpoint. Storage is replicated synchronously via EBS-level mirroring, not at the engine layer — both nodes write to two AZs every commit.

Aurora. 6-way replicated storage layer (4-of-6 quorum on writes, 3-of-6 on reads) decoupled from compute. Failover is just spinning up a new compute node on the existing storage volume — no data rebuild, no replication catch-up. <30s typical.

MHA (MySQL Master High Availability) — legacy. A Perl-based tool, deprecated for >5 years, but still seen in older deployments. Replaced largely by Orchestrator.

End-to-end walkthrough of one failover. Primary on db-shard17-primary.az-a (running MySQL 8.0 with semi-sync, GTID enabled) goes silent at 14:32:07 UTC due to NIC failure.

14:32:07 — NIC dies on db-shard17-primary.
14:32:10 — Orchestrator's first health check fails (timeout 3s).
14:32:13 — Second health check fails.
14:32:16 — Third health check fails. → Trigger failover.
14:32:17 — Orchestrator queries 3 replicas:
              replica-az-b: GTID_EXECUTED = srv-uuid:1-104289
              replica-az-c: GTID_EXECUTED = srv-uuid:1-104289
              replica-az-d: GTID_EXECUTED = srv-uuid:1-104287  (lagging)
14:32:18 — Election: replica-az-b chosen (tied for highest GTID;
              az-b matches "preferred failover zone" tag).
14:32:19 — Fence old primary: 
              (a) iptables rule on switching fabric blocks db-shard17-primary's MAC,
              (b) Orchestrator increments epoch in etcd from 17 → 18.
              Any future writes from the old primary are ignored by replicas
              because their replicate_filter requires epoch ≥ 18.
14:32:20 — Promote: SET GLOBAL read_only=OFF on replica-az-b.
              CHANGE MASTER TO MASTER_AUTO_POSITION=1, MASTER_HOST=replica-az-b
              on replica-az-c and replica-az-d.
14:32:24 — Update Vitess routing layer: shard17 primary = replica-az-b.
14:32:25 — Connection draining: existing app connections to old primary 
              receive ECONNRESET; reconnect via Vitess to new primary.
14:32:30 — All app pods reporting healthy writes to new primary.
14:32:30 — Failover complete. Wall-clock: 23 seconds detection-to-traffic.
14:35:00 — Old primary brought up on a spare host with a snapshot of
              replica-az-b at the failover GTID; re-attached as a fresh replica.

The 23-second window is what RTO=30s SLAs are budgeted for.

8.6 Split-brain prevention

Two writable primaries simultaneously = data corruption. Both accept conflicting writes; neither can replicate to the other; on reconciliation, one set silently wins.

Fencing tokens. A monotonic epoch number (or generation, or term, or fencing_token). Every primary write includes its epoch. Replicas reject writes from an epoch lower than their own. On promotion, the new primary's epoch is set to old_epoch + 1. The old primary, if still alive and writing, has an epoch that's now stale; its writes are rejected at the replica layer. This is the load-bearing primitive — without it, fast promotion is unsafe.

   epoch=17, primary=A    epoch=18, primary=B (after failover)

   t=0: A writes "balance=400" with epoch=17 → replicas accept.
   t=1: Network partition. A still thinks it's primary.
   t=2: Orchestrator promotes B with epoch=18. Replicas now expect 18.
   t=3: A writes "balance=300" with epoch=17 → REJECTED.
   t=4: B writes "balance=350" with epoch=18 → ACCEPTED.

STONITH (Shoot The Other Node In The Head). Hardware-level fencing. The orchestrator's failover step includes "power-cycle the suspected primary via IPMI/BMC" or "pull its switch port via SNMP." Brutal but unambiguous — a powered-off node cannot write. Used in legacy on-prem deployments. Cloud equivalent: terminate the instance via EC2 API.

Quorum-based decisions (Raft/Paxos). A consensus DB never elects a new leader without a majority. If a partition isolates the old leader, the partition with majority elects a new leader; the partition without majority refuses to elect. The old leader, observing it's lost quorum, demotes itself (step down on loss-of-quorum). No fencing needed at the application layer — the protocol itself is the fencing.

Watchdog timers. The primary itself watches its own connection to the orchestrator/DCS. If it loses heartbeat for N seconds, it self-demotes to read-only. Belt-and-suspenders against an orchestrator that promotes a new primary while the old primary is still alive and unaware.

Concrete split-brain scenario. Network partition between AZ-A (primary) and AZ-B (orchestrator + replicas). The primary in AZ-A is healthy but isolated; the orchestrator in AZ-B can't reach it, declares it dead, promotes a replica.

Without fencing:
   t=0: Primary in AZ-A keeps accepting writes from local apps in AZ-A.
   t=10s: Orchestrator promotes replica in AZ-B. Apps in AZ-B write to AZ-B.
   t=10s — t=120s: Network heals. Two primaries existed for 110s.
       Conflict: account_id=42 written on AZ-A (balance=400) 
       and on AZ-B (balance=350). 
       On reconciliation, AZ-A's later commits override → losing AZ-B's debit. 
       Data corruption.

With fencing:
   t=0: Primary in AZ-A keeps accepting writes locally, but its writes 
        can't reach replicas (partition).
   t=10s: Orchestrator promotes replica in AZ-B with epoch=18.
        Apps in AZ-A still connected to AZ-A primary attempt writes.
   t=10s+: The application's connection-pool layer in AZ-A has stale 
        routing — but the binlog tail-shipping fence ensures AZ-A's 
        writes since t=0 (with stale epoch=17) cannot be replicated, 
        and the AZ-A primary's writes are stranded on local disk.
   t=120s: Partition heals. AZ-A primary observes the new epoch=18, 
        demotes itself to replica, and *discards* its post-partition writes 
        (or surfaces them as conflicts requiring manual resolution).
        No corruption — the writes that were *acked* during the partition 
        were rejected at the replica fence and never appeared as committed
        to anyone outside AZ-A.

The lesson: fencing is what makes "we have automatic failover" not a euphemism for "we have automatic data loss."

8.7 Backup strategies

The backup is the database's last-resort time machine.

Logical backups (mysqldump, pg_dump). SQL statements that recreate the schema and data. pg_dump --format=custom produces a portable archive; mysqldump --single-transaction produces a consistent snapshot. Portable across major versions; can be restored to a different engine. Slow for restore: a 1 TB logical dump takes 10+ hours to restore because it replays through the SQL parser, planner, and executor row-by-row. Best for schemas under ~100 GB and for cross-version migrations.

Physical backups (xtrabackup/Percona for MySQL, pg_basebackup for PostgreSQL). Copy of the on-disk data files plus enough WAL to make them consistent. xtrabackup hot-copies InnoDB tablespace files while logging concurrent WAL writes; pg_basebackup streams the entire PG data directory through walsender's interface. Fast restore — it's just disk I/O. A 1 TB physical backup restores in ~3 hours (limited by sequential disk write rate, ~300 MB/s on commodity NVMe). Tied to engine version: restoring an InnoDB 8.0 backup to an 8.1 instance requires special handling.

Incremental / WAL-archiving backups. A weekly or nightly base backup (full physical) + continuous archiving of every WAL segment to S3/GCS as it rolls off. Restore = base backup + replay WAL from base-backup LSN to desired recovery target. Storage cost is dominated by base backups; WAL archive is incremental.

3-2-1 rule. 3 copies of data, 2 different storage media, 1 offsite. For a 17 TB payments shard: production data (1 copy on EBS) + snapshot in same region (2nd copy, different physical media) + cross-region S3 archive (3rd copy, offsite). Plus the live replicas as bonus redundancy.

Encrypted at rest. For compliance (PCI-DSS, HIPAA, SOC2). Either at the storage layer (EBS encrypted, S3 SSE-KMS) or at the application layer (column encryption with a customer-managed key). Backup encryption is the same key, plus the additional risk of "what happens if we lose the KMS key while the encrypted backup is offsite" — keep the KMS keys themselves backed up with strict access control.

Concrete production setup. A payments database at 17 TB/shard, 256 shards:

   Per-shard backup plan:
   - Nightly full physical backup via xtrabackup → S3 in same region.
     Cost: ~15 TB compressed (xtrabackup compresses InnoDB pages well).
     Wall-clock: ~3 hours per shard, run sequentially across shards using
     a backup orchestrator (one shard at a time per AZ to limit network).
   - Continuous binlog archive: every binlog segment rolls off → S3 within 5min.
     Storage cost: ~3 TB/day across all shards; retention 30 days = 90 TB.
   - Retention: 30 days of nightly + 7 yearly snapshots (anniversary backups)
     for legal/compliance.
   - Multi-region replication of the backup bucket itself via S3 CRR.
   - Quarterly disaster-recovery drill: restore one shard's backup into a 
     fresh cluster, replay binlog to a specific GTID, verify row counts and 
     checksums against production replica. Document RTO.

The drill is the load-bearing piece. Backups that have never been restored are not backups; they are encrypted files of hope.

8.8 Point-in-time recovery (PITR)

The "drop table production" scenario at 11:00 AM on a Tuesday. An engineer running a maintenance script forgot to set the right database context; DROP TABLE accounts ran in production. Replicas faithfully replicated the drop. The table is gone everywhere.

PITR is the procedure to recover to exactly the moment before the bad event.

Goal: restore to 2026-05-22 10:59:59 UTC (1 second before DROP TABLE at 11:00:00).

Steps:
  1. Identify exact target. Inspect binlog/WAL for the bad event.
     MySQL: mysqlbinlog binlog.0001042 | grep -B5 'DROP TABLE accounts'
     → found at GTID srv-uuid:1042087 at timestamp 2026-05-22 11:00:00.123
     → recover to GTID srv-uuid:1042086 (the previous transaction)

  2. Bring up a recovery cluster (separate from production).

  3. Restore the most recent full physical backup before the event 
     (last night's 02:00 UTC backup).
     → 1 TB restore via xtrabackup-restore in 3 hours.

  4. Replay WAL/binlog forward from backup LSN to recovery target.
     MySQL: configure replication from binlog archive in S3,
            STOP SLAVE at gtid_executed = srv-uuid:1042086.
     PostgreSQL: recovery.conf / recovery_target_time = '2026-05-22 10:59:59'
            or recovery_target_lsn = '0/3A12F800'.
     Wall-clock: replay rate ~10–30 MB/s for WAL through SQL apply.
     With 9 hours of binlog (last backup at 02:00 → recovery target 11:00),
     and a 50 MB/s primary write rate, that's 1.6 TB of WAL to replay.
     ~12–15 hours.

  5. Stop precisely. Verify the dropped table exists with row count matching
     prod's state at 10:59:59 (compare to checksums or daily count audit).

  6. Promote the recovery cluster (or just extract the needed table).
     Strategy A: copy the recovered table back into the production cluster 
                 via mysqldump or pg_dump (faster than full failover).
     Strategy B: failover production to the recovery cluster (RTO = the
                 12 hours just spent).

  7. Document the gap: any writes between 11:00:00 (the drop) and "now" 
     that touched the dropped table are lost; reconstruct from upstream 
     sources or accept the gap.

Total time cost. ~1 hour per TB for the base restore + WAL replay time (~10–30 MB/s replay rate × WAL volume since base). For a 1 TB database with 9 hours of WAL since last backup: ~3 hours restore + ~12 hours replay = ~15 hours. For PB-scale, you need shard-parallel restore + WAL replay — restoring 256 shards in parallel for ~3 hours each instead of sequentially.

The real-world examples. GitHub's 2012 outage involved restoring from binlog after a database corruption; recovery took ~6 hours. GitLab's 2017 incident (rm -rf of the production directory) — 6 hours of work lost because backups had silently been broken for months. The pattern: PITR works if and only if backups are tested and binlog archive is intact.

8.9 Cross-region disaster recovery

What survives the loss of a region — not just an AZ.

Active-passive (warm standby in another region). Async replication from the primary region to a passive cluster in another region. ~50–200ms lag (the cross-region RTT). RPO ≈ lag window (anything in flight at the moment of regional loss is lost). RTO = manual failover process: ~10–30 minutes including human decision-making.

   us-east-1 (active)                       us-west-2 (passive)
   ─────────────────                        ──────────────────
   Primary (semi-sync within region)        Standby cluster (async receive)
       │                                            ▲
       │ ─── async binlog/WAL stream ──────────────►│
       │     (cross-region, 50–80ms RTT)            │
       ▼                                            │
   Apps in us-east-1 ──── write ──────►             │
                                                    │
                                          (No app traffic; cold standby)
   On regional loss:
   1. Page on-call. Triage: is this really regional loss or just AZ?
   2. Manual: update DNS/route config to direct apps to us-west-2 primary.
   3. Promote us-west-2 standby to read-write.
   4. Reconfigure CDC pipelines to read from us-west-2 binlog/WAL.
   5. Communicate RPO (lag window) to customer; reconstruct lost txns 
      from upstream sources where possible (idempotency keys help).

Active-active (writes accepted in multiple regions). Writes route to the nearest region for latency; conflict resolution required. Three approaches:

  • LWW (Last Write Wins) with vector clocks. Each write tagged with a vector of (region, logical_clock). Conflicts resolved by causality where possible, falling back to timestamp. Silent corruption for monetary operations — losing a debit because two regions both wrote the same row and the older timestamp won. Avoid for OLTP.
  • CRDTs (Conflict-free Replicated Data Types). Counters that add (G-counters, PN-counters); sets that union (G-sets, OR-sets); maps and registers with last-writer-wins on specific fields. Eventual consistency is intrinsic. Works for like-counts, presence indicators, collaborative documents (Yjs, Automerge). Doesn't work for "balance must equal credits minus debits."
  • Partition-per-region (geo-partitioning). Each region owns a disjoint set of partitions. All writes for partitions owned by region X go to X; conflicts are impossible because no two regions ever write the same row. Spanner's geo-partitioning, CockroachDB's locality-aware partitioning. This is the cleanest active-active pattern but requires partitioning by a key that's stable per-tenant (a user's home region).

Latency tax of synchronous cross-region. A Raft commit across us-east-1 / us-west-2 / eu-west-1 (a "three-region" deployment for true RPO=0) pays the slowest pairwise RTT. US-East to US-West is ~70ms; US-East to EU-West is ~100ms; quorum on a write needs 2-of-3 in this geometry, which is ~70ms in the best case. Every commit is ~70–100ms. For OLTP at 10k commits/sec, this is structurally unviable — you couldn't keep enough transactions in flight to saturate the system.

Concrete cross-region setups.

  • Stripe: primary in US, async standby in another region. RPO measured in seconds. Customer-facing OLTP runs in one region with low-latency commits.
  • Notion: PostgreSQL cross-region async replicas for DR. Active-passive.
  • OceanBase at Alipay: 3-region Paxos quorum across Beijing, Shanghai, Shenzhen. RPO=0, but ~30–50ms commit latency due to inter-DC RTT within China being short.
  • CockroachDB: geo-partitioning by region column where applicable; pure-async cross-region for fallback.

The "do you really need active-active" pushback. Active-passive at 50ms RPO with 30-minute RTO is more than adequate for ~95% of OLTP workloads. The latency cost of synchronous cross-region (50–200ms per commit) and the conflict resolution complexity of active-active (corruption risks, custom conflict handlers, application-level partitioning) are rarely worth it. Pick active-passive unless there is a specific requirement (regulatory data residency requiring writes in-region, hard latency targets for users in two regions) that forces active-active.

8.10 RTO/RPO targets and architecture

RTO (Recovery Time Objective) = how long to restore service after a failure. RPO (Recovery Point Objective) = how much data we can lose. Pair them deliberately; each step down costs an order of magnitude more.

RTO RPO Architecture Cost multiplier
30s 0 Semi-sync replicas in another AZ + automated failover (Orchestrator/Patroni). Cross-AZ commit latency. 5–10x (replicas)
5min 0 Same as above but with manual failover after automated detection — human gates the promotion. 4–8x
1hr 15min Cross-region async replica + PITR via WAL archive. Lag = your worst-case RPO. 2–4x
24hr 24hr Nightly logical backup to S3. No replication. Restore is sequential WAL-less. 1.2x
30s 0 (cross-region) Paxos/Raft quorum across regions. NewSQL or custom consensus. 10–20x + latency

The 95% pick for production OLTP at meaningful scale: RTO=30s RPO=0 within a region (semi-sync + cross-AZ failover), RTO=15min RPO≈lag-window across regions (async cross-region for DR).

Cost intuition: every "step better" on RTO is a step better on automation; every "step better" on RPO is a step better on replication mode. Going from RPO=15min to RPO=0 means moving from async to semi-sync — 10x more cross-region bandwidth and replica capacity. Going from RPO=0 within-region to RPO=0 across-region means consensus across regions — 100ms commit latency tax, possibly fatal to throughput.

The synthesis: RTO/RPO is a product decision, not a technology decision. Pick them based on what the business loses per minute of outage and per second of data loss; provision the architecture that achieves them; do not over-engineer.


§9. Failure Mode Walkthrough

Every failure named, with the durability point.

9.1 Primary crashes mid-transaction. T was writing pages; redo records buffered but not flushed. On restart, the database scans redo log from last checkpoint, finds no COMMIT for T, rolls back via undo log. Lock released. T never happened. Client times out, retries with idempotency key. Durability point: none for T; correct outcome.

9.2 Primary crashes after fsync of commit, before binlog ack to replica. T committed locally (redo log fsync'd at LSN 1080). Binlog event generated; semi-sync wait in progress or just completed. On restart, T is redo-replayed and surfaces in the table. Binlog also persisted (sync_binlog=1). Replicas read the event and apply it; if already received, replication is idempotent at event level. If client ack was sent, T is durable. If not, client retries with idempotency key, sees status=committed in idempotency row, returns prior result. Durability point: fsync of redo log + binlog at LSN 1080.

9.3 Network partition isolating primary from quorum of replicas. Primary can write locally but semi-sync is blocking (no replica reachable). With wait_no_slave=ON, new commits stall. Primary refuses writes; clients see timeouts. When partition heals, replication catches up. If long-lived, the orchestrator may demote the isolated primary and promote a reachable replica. To prevent split-brain: a fencing token (monotonic epoch) ensures the demoted primary cannot ack new writes; STONITH (Shoot The Other Node In The Head) at the infra layer is belt-and-suspenders. Durability point: most recent commit that achieved replica ack; anything past that on the demoted primary is rolled back.

9.4 Permanent loss of a primary. Disk fails or host catches fire. Replicas survive. Orchestrator promotes the replica with the highest binlog/WAL position (SHOW SLAVE STATUS in MySQL; pg_last_wal_replay_lsn() in PostgreSQL). Other replicas re-point. A fresh node is provisioned, snapshot-restored, added as third replica. Wall-clock to full redundancy: ~30 minutes; service back in <30 seconds with degraded redundancy. Durability point: last semi-sync acked commit. RPO=0 in the common case.

9.5 Permanent loss of an entire AZ. Primary + 1 replica gone in AZ-A. 1 replica surviving in AZ-B. Promote the AZ-B replica. Provision two new replicas in AZ-C and AZ-D from snapshot. Service runs degraded (single-replica) for ~30 minutes during reconstruction. Durability point: surviving replica's last applied event.

9.6 Silent disk corruption (cosmic ray hits a B+ tree page). A page on the primary has a flipped bit; reads return wrong data. InnoDB writes a checksum in each page header/trailer; PostgreSQL writes per-page checksums when initdb'd with --data-checksums. On read, mismatch raises an I/O error; the row read fails; orchestrator routes to a replica. Corrupted page reconstructed by replaying binlog/WAL onto a freshly initialized region, or single-page recovery from a healthy replica. Durability point: any replica with the uncorrupted page. Without checksums, silent corruption silently violates invariants. Run them in production, not just dev.


§10. Why Not the Obvious Simpler Alternative

"Just use flat files." Replaying every event since genesis to answer "balance of account 42?" costs 100ms per query. To make it fast you'd materialize current state — you've invented a database. To make materialization crash-safe you'd add a write-ahead log — you've invented WAL. To handle concurrent updates you'd add locks — you've invented a transaction manager. The naive alternative becomes the database through three rebuilds.

"Just use Redis as the primary." Memory cost: 17 PB of payments data in RAM is not financially sane (disk is 50–100x cheaper). Persistence: default AOF (Append-Only File) fsync is everysec, losing up to 1s of acked writes on crash; always makes Redis ~10x slower. Transactions: MULTI/EXEC has no rollback on conditional failure; Lua gives atomicity but not isolation across keys. Replication: async by default. Redis is the correct fast-path layer in front of a real database — caching, idempotency lookups, rate limiting. Not the durable primary for conservation-critical state.

"Just use a global distributed transaction across all shards." Global locks serialize the entire workload. At 583k writes/sec, holding a global lock for 100µs caps throughput at 10k/sec — 60x below need. The point of sharding is to break the contended-lock barrier.

"Just use Cassandra, it horizontally scales." Without LWT (Lightweight Transactions) you have no per-key ordering — concurrent writes resolve by timestamp (LWW — Last Write Wins), so clock skew silently loses a debit. With LWT you pay Paxos round-trips (4 messages, ~5ms each in-DC), capping LWT-per-partition at 200–500/sec. To transfer money atomically between two rows you'd build 2PC over LWT — stacking expensive protocols. Real failure: two concurrent debits both CAS balance=500 to balance=400. One wins, one loses, retries. Under contention the retry storm collapses throughput to a few hundred/sec — same bottleneck as a MySQL row lock at 4x the infra cost. Cassandra is right for time-series, fanout, IoT; wrong for conservation.

"Use object storage as the database." Eventual consistency on listing, no cross-object transactions, no indexes, per-object overhead that makes per-row workloads economically absurd. Object storage is the right archive tier (Parquet, blobs); not a transactional substrate.


§11. Scaling Axes

Transactional databases scale on two axes that demand different solutions.

Vertical scaling. Bigger machine: more cores, RAM, faster SSDs. A single PostgreSQL on 96-vCPU, 768 GB RAM, NVMe sustains ~30–50k commits/sec. Inflection: 30–50k is the commodity-hardware ceiling. Beyond that, lock manager and WAL fsync become structural bottlenecks. You can ride this for a long time (Notion did until 2021, Hacker News until 2016), but eventually you shard.

Horizontal scaling: read replicas. Add async or semi-sync replicas; route read-heavy queries; primary handles writes and read-after-write. Inflection: replicas help when read:write is high (>10:1). Payments at 1:1 sees little benefit; profile-rendering at 1000:1 scales beautifully — 8 replicas → ~8x read capacity. Replica lag is the operational problem.

Horizontal scaling: sharding (Type 1 — uniform growth). When more entities (users, accounts, documents) require more capacity than one primary handles, partition by entity key. hash(key) % N; mod by power of two for clean resharding (N → 2N). Each shard is its own primary with replicas. A router (Vitess, ProxySQL, application-layer) maps queries to shards. Inflection: when per-shard write rate hits 30–50% of sustainable ceiling, stage the next split. 512 → 1024 with dual-write + backfill + cutover takes weeks but is online throughout. Cost is permanent: cross-shard joins are N×M, cross-shard transactions need 2PC, schema migrations coordinate across shards.

Horizontal scaling: hot keys (Type 2 — same entities, more rate per entity). The harder axis. The same entity (a Twitch streamer, a flash-sale SKU, a viral hashtag) sees increasing write rate. Sharding alone does not help — hot row is still on one shard. Fix: write-shard within the entity (§7.1). Inflection: a single row exceeds 1k writes/sec sustained. Beyond write-sharding, log-based pre-aggregation — append to a Kafka topic partitioned by entity, with a consumer batching in 100ms windows and applying a single SUM update. Lose strict per-event ordering (acceptable for tips, likes, clicks); preserve eventual sum. Useful past 100k writes/sec on one entity.

Both knobs needed. Sharding alone (Type 1) doesn't fix hot keys (Type 2). Write-sharding alone (Type 2) doesn't fix capacity growth (Type 1). Every large-scale OLTP deployment ends up with both.

11.1 Sharding strategies in depth

The choice of how to shard outlives the choice of whether to shard. Three families.

Hash sharding. shard_id = hash(partition_key) % N. Spreads keys uniformly across N shards regardless of key distribution. Works for high-cardinality keys (account_id, user_id) where you don't need range queries on the partition key. The price: rebalancing on N change is painful. Going from 512 → 513 shards remaps ~99.8% of keys; nearly every row has to move. The standard mitigation is to round N to a power of two — sharding by hash(key) % 1024 lets you split each shard cleanly in half (mod 1024 → mod 2048; keys with (hash mod 2048) < 1024 stay put, others move to the new sibling). Splitting halves only the bottom-half data, not the entire dataset. Vitess's KeyspaceID and most modern hand-rolled shard maps use this pattern. Pinterest's 4096 logical MySQL shards started at 256 and doubled three times — each doubling moved half the data.

hash sharding with power-of-two N:

   N=4                      split shard 2 into 2a + 2b → N=5? NO, → N=8
   ─────                    ────────────────────────────────────
   key → hash → mod 4       key → hash → mod 8
                            keys where (hash mod 4)==2 split:
   shard 0                     (hash mod 8)==2 → stays in old shard 2 (renamed 2a)
   shard 1                     (hash mod 8)==6 → moves to new shard 2b
   shard 2  ← splitting     other shards unchanged.
   shard 3

Range sharding. Shards own contiguous ranges of the partition key. [0, 1M) → shard 0, [1M, 2M) → shard 1, .... Good for sequential keys that need range queries (e.g., time-series, audit logs). Hotspot risk on time-shaped data: if you shard events by event_id and IDs are monotonic, all new inserts hit the highest-range shard while older shards go cold. This is the "hot last shard" problem. Mitigations: hash a prefix of a composite key, or shard by (tenant_id, event_id) so within-tenant ranges work but tenants spread.

Directory-based (lookup table) sharding. A separate metadata table maps partition_key → shard_id. Flexible: any rebalancing is a directory update. Common at startups before they regret it. Costs:

  • Extra hop. Every query consults the directory (cached, but a cache miss is a hop).
  • The directory becomes critical — it's now a single point of failure, often itself sharded later (turtles all the way down).
  • Cache coherence. Stale directory entries route to the wrong shard. Need eviction on shard moves.

Used cleanly when partitions are tenant-shaped (each tenant has a "home shard" in a directory) and the directory is small enough to keep in memory on every app server.

Vitess VReplication for online resharding. The operational pattern for moving from N shards to 2N (or N → M with arbitrary mapping). Three phases:

   Phase 1 — Dual write + backfill
   ───────────────────────────────
   - Old shard (N) keeps accepting writes (and continues to be the source of truth).
   - Set up VReplication: stream binlog from old shard → new shard.
   - Backfill historical data: copy all rows from old shard to new shards 
     (partitioned by the new sharding function).
   - When backfill completes, new shards are caught up to the binlog tail.
   - Continue dual-shipping new writes (old + new).

   Phase 2 — Switch reads
   ──────────────────────
   - Verify new shards are consistent with old (row counts, checksums).
   - Update routing layer to read from new shards.
   - Old shards still receive writes (continuing dual-write for safety).
   - Observe for several days — read traffic on new, can revert if issues.

   Phase 3 — Switch writes + decommission
   ──────────────────────────────────────
   - Atomic cutover: writes now go to new shards only.
   - Old shards become read-only "archive" for a grace period (days).
   - After verification window, decommission old shards.

End-to-end, a 512 → 1024 resharding at Slack-scale runs weeks but the database stays online throughout. The Vitess implementation handles the dual-write, backfill orchestration, cutover, and rollback machinery — building this from scratch is a ~6-month engineering project.

11.2 Auto-increment vs UUID for shard key

The shard-key choice has hidden traps.

Sequential shard key. If the shard key (or its prefix) is auto-incrementing, all new rows pile on the last shard. A posts table sharded by hash(post_id) % 1024 is fine — hash spreads — but a posts table sharded by post_id range with monotonic IDs creates a hot last shard. Common at startups: "we'll shard by customer_id, range-based" — fine for now, devastating later when one tenant's burst goes to one shard.

Hash of UUIDv4 as shard key. UUIDv4 is uniformly random; hash(uuid) % N spreads cleanly. No hot last shard, no distribution skew from key generation. Cost: no clustering. Two consecutive customer signups go to different shards. Cross-customer queries scatter to N shards.

Hash of natural business key. hash(customer_id) % N where customer_id is itself a UUIDv4 or a non-monotonic identifier. All of a customer's data lives on one shard; cross-customer queries are still scatter-gather but per-customer workflows (loading a customer dashboard) stay local.

UUIDv7 / ULID for primary key + customer_id for shard key. The modern pattern. UUIDv7 has a 48-bit timestamp prefix followed by random bits — it's globally unique, time-ordered (preserving B+ tree insert locality), but not predictable. Use it as the primary key for B+ tree friendliness. Use a separate shard key (customer_id, hashed) to control distribution. Best of both worlds.

   Bad:  shard by post_id range (post_id auto-increment)
         → all new posts on last shard, hot.

   OK:   shard by hash(post_id) — UUIDv4 post_id
         → spreads evenly, but per-customer queries scatter.

   Good: PK = post_id UUIDv7 (B+ tree friendly), 
         shard by hash(customer_id) — per-customer locality preserved.

11.3 Multi-region

The final scaling axis. Covered in detail in §8.9; the scaling-relevant summary.

Active-passive (warm standby). Primary in one region, async cross-region replica in another. RPO ≈ replication lag window (typically seconds to minutes). RTO ≈ manual failover time (10–30 minutes). Cost: 1 extra cluster, plus cross-region bandwidth for async stream (~50 MB/s for a busy shard). The 95% pick for OLTP that needs cross-region DR.

Active-active (writes in multiple regions). Three variants — LWW with vector clocks (silent corruption risk, avoid for OLTP), CRDTs (eventual consistency, works for counters/sets/collaborative docs not for conservation), or partition-per-region (geo-partitioning — each region owns disjoint keys, no conflicts possible). Geo-partitioning is the cleanest active-active but requires keys with a natural regional home.

NewSQL with Raft across regions. True cross-region linearizability via consensus. 50–200ms commit latency per transaction. Right for workloads where you need a global serialization point (a global rate limiter, a shared inventory across regions). Wrong for any latency-sensitive OLTP (a 100ms-per-commit cap is a 10/sec-per-key write rate).

The pushback. Most OLTP workloads — payments, profiles, e-commerce, SaaS docs — do not need active-active. The latency tax and conflict resolution complexity are rarely worth the engineering and operational cost. The default is active-passive with manual failover; promote to active-active only when a concrete requirement (regulatory residency, hard latency targets in two regions) forces it.


§12. Operational Concerns

The previous sections covered the storage engine (§4), hard problems (§7), replication (§8), and scaling axes (§11). This section covers the operational details that decide whether a deployment of those technologies stays alive at 3 AM on a Sunday or collapses under predictable load. These are concerns the database engine itself doesn't solve — but every operator of a transactional system must.

12.1 Connection pooling

The 100-300 connection ceiling is real and load-bearing for PostgreSQL design.

Why PostgreSQL hits a wall around 100–300 connections. PostgreSQL uses a process-per-connection model (fork() on accept). Each connection is a separate OS process consuming ~10 MB of RAM (process stack + per-backend buffers + work_mem allocations). 300 connections = 3 GB just for process overhead, plus the per-backend work_mem (default 4 MB, scales with concurrent queries). Past ~300 active connections on a 32 GB instance, you're context-switching constantly, the OS scheduler thrashes, and lock contention on internal data structures (proc array, pg_stat) becomes the limit.

MySQL is lighter but still bounded. MySQL uses thread-per-connection — threads share the process, each consuming ~256 KB stack + per-thread buffers. 1000 threads is more tolerable than 1000 processes. But MySQL is still bounded by the internal lock manager's bookkeeping, the thread pool's scheduler, and the InnoDB buffer pool's latch contention. Default max_connections = 151; real-world deployments raise to 1000–5000.

The math that bites you. An app with 100 servers × 50 conn-per-server in its connection pool = 5000 connections. A PostgreSQL primary with max_connections=500 will refuse new connections at 501. Apps start throwing FATAL: sorry, too many clients. The classic startup death-spiral.

Solutions.

PgBouncer is the PostgreSQL canonical answer. Three pooling modes:

Mode One client = one server connection for... Compatible features Pool ratio
Session Full client session (login → logout) Everything (prepared stmts, SET LOCAL, temp tables) 1:1 ish
Transaction Single transaction (BEGIN → COMMIT) Most things, but breaks prepared stmts, SET LOCAL 100:1
Statement Single statement Only stateless queries (no txns, no prep stmts) 1000:1

Transaction-mode is the sweet spot for most workloads. App opens 5000 logical connections to PgBouncer; PgBouncer multiplexes them onto a 50-connection pool to PostgreSQL. 100x reduction. Caveat: applications using PostgreSQL features that hold connection-level state (SET LOCAL search_path, server-side prepared statements with PREPARE/EXECUTE, temp tables, advisory locks across transactions) break in transaction mode — they need session mode or to stop using those features.

ProxySQL for MySQL. Same idea — multiplexes client connections onto a smaller backend pool. Also adds query routing (sends reads to replicas, writes to primary), query rewriting, and rate limiting.

RDS Proxy is AWS's managed PgBouncer/ProxySQL equivalent. Lambda-friendly (Lambdas tend to thrash connection pools).

Concrete numbers.

   Before pooling:
     100 app servers × 50 conn each = 5000 connections to PostgreSQL.
     PostgreSQL max_connections=500 → refuses 90% of connection attempts.
     OR if max_connections=5000 → ~50 GB just for process overhead.

   After pooling (transaction mode):
     100 app servers × 50 conn each = 5000 connections to PgBouncer.
     PgBouncer (on the PG host or a sidecar) → 50 connections to PostgreSQL.
     PostgreSQL handles 50 backends comfortably: ~500 MB process overhead.
     PgBouncer overhead: ~50 MB.
     100x reduction in PG-side resource usage.

Why this matters. A common startup outage: the app autoscales from 10 to 100 instances during a traffic spike; each instance's connection pool of 30 opens; 3000 new connections hit PostgreSQL; max_connections exceeded; PG refuses connections; app starts failing; pages fire. Pooler in front prevents this entirely.

12.2 Deadlocks

Two transactions hold locks the other needs. Database detects the cycle, picks a victim, returns a deadlock error.

Detection. PostgreSQL and MySQL both maintain a wait-for graph: when transaction T waits for transaction T', add edge T → T'. Periodically (every ~1 second by default; deadlock_timeout in PG) scan for cycles. On finding a cycle, pick a victim — usually the transaction that has done the least work — and abort it with ERROR: deadlock detected (PG) or Error 1213: Deadlock found (MySQL).

Application strategies.

  • (a) Consistent lock ordering. Always acquire locks in a canonical order — e.g., always lock the lower account_id first. The classic example:
   Without consistent ordering — DEADLOCK:
     Txn A: BEGIN; UPDATE accounts WHERE id=100; UPDATE accounts WHERE id=200;
     Txn B: BEGIN; UPDATE accounts WHERE id=200; UPDATE accounts WHERE id=100;
     A holds 100, waits for 200. B holds 200, waits for 100. Cycle. Deadlock.

   With consistent ordering — SERIALIZE:
     Always lock the lower account_id first.
     Txn A wants (100, 200): locks 100 then 200.
     Txn B wants (100, 200): locks 100 then 200.
     They serialize cleanly. No deadlock.
  • (b) Retry on deadlock. Deadlocks are transient — the victim's transaction is rolled back, so retry usually succeeds (the contending transaction has committed by then). Cap retries at 3–5 with exponential backoff.
  • (c) Reduce transaction scope. Long transactions hold locks longer, increase deadlock probability. Move non-transactional work outside BEGIN ... COMMIT. Use shorter transactions even at the cost of more round trips.
  • (d) SELECT FOR UPDATE NOWAIT / SKIP LOCKED. For queue workers competing for jobs: SELECT FOR UPDATE SKIP LOCKED LIMIT 1 lets each worker grab a different row without contending on the same head. PostgreSQL added it in 9.5; MySQL in 8.0. Eliminates deadlocks in the queue-worker pattern.

Concrete example. A payments engine processing 1000 transfers/sec. Each transfer is UPDATE accounts SET balance = balance - $X WHERE id = $sender; UPDATE accounts SET balance = balance + $X WHERE id = $receiver. Without consistent ordering, deadlock rate is ~0.1% (one in a thousand transfers); during peak traffic this is 1/sec. With "always update lower account_id first" — zero deadlocks. Production-grade payments engines all do this.

12.3 Isolation level anomalies in detail

The standard isolation levels and what each permits.

Dirty read (READ UNCOMMITTED only). Reader sees writes from an uncommitted transaction. If the writer rolls back, the reader has acted on data that "never existed." PostgreSQL refuses to implement this — SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED is treated as READ COMMITTED. MySQL InnoDB implements it but nobody uses it in OLTP.

Non-repeatable read (permitted by READ COMMITTED). Within a transaction, the same SELECT returns different rows because another transaction committed in between.

   T1: BEGIN; SELECT balance FROM accounts WHERE id=42;  -- returns 500
   T2: BEGIN; UPDATE accounts SET balance=400 WHERE id=42; COMMIT;
   T1: SELECT balance FROM accounts WHERE id=42;  -- returns 400 (different!)

REPEATABLE READ prevents this — T1 sees the same value on both reads because MVCC pins T1's snapshot.

Phantom read (permitted by READ COMMITTED, also permitted by standard REPEATABLE READ; PG and InnoDB MVCC happen to prevent it for SELECT). Same range query returns different row sets because another transaction inserted/deleted rows.

   T1: BEGIN; SELECT COUNT(*) FROM orders WHERE customer=42;  -- returns 10
   T2: BEGIN; INSERT INTO orders (customer, ...) VALUES (42, ...); COMMIT;
   T1: SELECT COUNT(*) FROM orders WHERE customer=42;  -- standard SQL: returns 11 (phantom!)
                                                         -- PG / InnoDB MVCC: still 10

PostgreSQL's MVCC and InnoDB's REPEATABLE READ both prevent phantoms for plain SELECT. SELECT FOR UPDATE in InnoDB uses gap locks to prevent phantoms; PG SERIALIZABLE uses SSI.

Lost update. Two concurrent UPDATEs both read 100, both compute 99, both write 99 — but conceptually two decrements happened.

   T1: BEGIN; SELECT balance FROM accounts WHERE id=42;  -- 500
   T2: BEGIN; SELECT balance FROM accounts WHERE id=42;  -- 500
   T1: UPDATE accounts SET balance=400 WHERE id=42; COMMIT;  -- writes 400
   T2: UPDATE accounts SET balance=400 WHERE id=42; COMMIT;  -- writes 400

   Two debits of 100 each, but balance went 500 → 400. One debit lost.

Real fix: UPDATE accounts SET balance = balance - 100 WHERE id=42 AND balance >= 100 (atomic), or SELECT FOR UPDATE followed by computation (locking blocks T2 until T1 commits), or REPEATABLE READ with explicit retry on serialization failure.

Write skew. The famous anomaly. Two transactions read overlapping data, write disjoint data, but together violate an invariant.

   Invariant: at least one doctor must be on-call.

   T1: BEGIN; SELECT COUNT(*) FROM doctors WHERE on_call=true;  -- 2 doctors on-call
   T2: BEGIN; SELECT COUNT(*) FROM doctors WHERE on_call=true;  -- still 2 (T2's snapshot)
   T1: -- Alice sees "I'm one of two, safe to go off-call"
       UPDATE doctors SET on_call=false WHERE id=Alice;  COMMIT;
   T2: -- Bob sees the same, makes the same decision
       UPDATE doctors SET on_call=false WHERE id=Bob;  COMMIT;

   Now: 0 doctors on-call. Invariant violated. No row overlap.
   REPEATABLE READ doesn't catch it — different rows updated by each txn.

REPEATABLE READ doesn't catch this because the two transactions never write to the same row. Only SERIALIZABLE prevents write skew. PostgreSQL's SSI (Serializable Snapshot Isolation) in 9.1+ uses predicate locks — tracks the predicates each transaction read (WHERE on_call=true) and aborts one if their predicates intersect with another's writes in a way that breaks serializability. The abort comes as ERROR: could not serialize access due to read/write dependencies — the application retries.

When SERIALIZABLE is worth the cost. Multi-row business invariants — "at least one doctor on-call," "total of all balances must equal initial deposits minus withdrawals," "no two reservations overlap on the same room." If your invariant is enforceable with a UNIQUE constraint or a single-row CHECK, you don't need SERIALIZABLE. If it spans multiple rows of data the application reads-then-decides-then-writes, you do. PG's SSI has ~10–20% throughput cost over REPEATABLE READ; usually acceptable for the safety.

12.4 Index types beyond B-tree

PostgreSQL exposes a richer index zoo than MySQL because the GIN/GiST/BRIN access methods are part of the engine's extensible index API. The right index can be the difference between a 200ms query and a 5ms one.

B-tree (default). Equality and range queries. Sorted keys, leaf pages linked for sequential scan. The right answer ~80% of the time.

Covering indexes. A B-tree index that includes all columns referenced by a query, allowing an index-only scan (no clustered-tree lookup).

   Slow:
   CREATE INDEX idx_user_email ON users (email);
   SELECT name, email FROM users WHERE email = 'foo@bar';
     → index scan finds row, then heap lookup for name (2x I/O).

   Fast:
   CREATE INDEX idx_user_email_inc ON users (email) INCLUDE (name);  -- PG 11+
   SELECT name, email FROM users WHERE email = 'foo@bar';
     → all data on the index page, no heap lookup. 10x faster on hot queries.

InnoDB's secondary indexes naturally cover when the queried columns are all PK columns (because secondary leaves contain the PK). PG needs explicit INCLUDE.

Partial indexes. Index only rows matching a predicate. Smaller index, faster updates, useful when most queries filter on the predicate.

   CREATE INDEX idx_active_users_email ON users (email) WHERE deleted=false;

   - Index is ~30% the size if 70% of users are non-deleted (typical).
   - Updates that mark a row deleted=true are free of index work.
   - Queries with WHERE deleted=false implicitly hit this index.

GIN (Generalized Inverted Index). For composite values — arrays, JSONB, full-text search. Each "element" of the composite is indexed; the lookup finds matching documents containing that element.

   CREATE INDEX idx_tags ON posts USING GIN (tags);
   SELECT * FROM posts WHERE tags @> ARRAY['postgres'];  -- "contains postgres"

   CREATE INDEX idx_meta ON posts USING GIN (meta jsonb_path_ops);
   SELECT * FROM posts WHERE meta @> '{"author":"alice"}';  -- JSONB containment

   CREATE INDEX idx_search ON posts USING GIN (to_tsvector('english', body));
   SELECT * FROM posts WHERE to_tsvector('english', body) @@ to_tsquery('claude & code');

GIN handles @>, ?, @@ operators efficiently. Slower to update than B-tree (each indexed value triggers multiple index entries).

GiST (Generalized Search Tree). For geometric data (PostGIS), range types, fuzzy match (trigrams). Tree of bounding-box-style structures.

   CREATE INDEX idx_geom ON locations USING GIST (geom);
   SELECT * FROM locations WHERE ST_DWithin(geom, point, 1000);  -- within 1km

   CREATE INDEX idx_period ON bookings USING GIST (period);  -- tsrange
   SELECT * FROM bookings WHERE period && '[2026-05-01, 2026-05-31)';

BRIN (Block Range INdex). For naturally clustered data — time-series with monotonic timestamps. Tiny: stores one min/max per block range (e.g., per 128 pages = 2 MB block).

   CREATE INDEX idx_created ON events USING BRIN (created_at);
   SELECT * FROM events WHERE created_at > '2026-05-01';

   - 1000x smaller than B-tree (a 10 GB B-tree → 10 MB BRIN).
   - Slightly slower point lookups (must scan an entire block range).
   - Perfect for time-series, append-only logs, IoT data.

Hash indexes. Equality only. Niche. PG made them WAL-logged and crash-safe only in PostgreSQL 10; before that they were unsafe. MySQL's MEMORY engine uses them; InnoDB's "adaptive hash index" is automatic. Rarely beats B-tree in practice because B-tree handles equality almost as fast and supports range too.

12.5 VACUUM in PostgreSQL

PostgreSQL's MVCC stores dead tuples in the heap; VACUUM reclaims them. The lifecycle of a dead tuple decides whether your table grows monotonically or stays stable.

Why dead tuples exist. An UPDATE in PostgreSQL is "insert new tuple version + mark old tuple's xmax to the updating txn." The old tuple stays in the heap until VACUUM marks its space reclaimable. A DELETE marks xmax; same lifecycle.

Autovacuum. Runs in the background. Triggered when n_dead_tup / n_live_tup > autovacuum_vacuum_scale_factor (default 0.2 = 20% dead). Tunable per-table via ALTER TABLE foo SET (autovacuum_vacuum_scale_factor = 0.05) for hot tables.

VACUUM vs VACUUM FULL.

  • VACUUM marks dead tuples reusable — the space is available for new tuples within the same heap pages. Does not return space to the OS. Online — no exclusive lock. Run automatically by autovacuum.
  • VACUUM FULL rewrites the table to a new file with only live tuples. Returns space to OS. Takes an exclusive lock — blocks all reads and writes. Use only during maintenance windows. For online table-rewrite, use pg_repack instead.

pg_repack. An extension that does VACUUM FULL's job without the exclusive lock: creates a shadow table, copies live tuples, dual-writes via triggers during copy, atomically swaps. Production tool for shrinking a bloated table back to its non-bloated size.

Vacuum freeze and xid wraparound. PostgreSQL transaction IDs are 32-bit. After ~2 billion transactions, the xid counter wraps around. Tuples with very old xids would appear as "from the future" once the counter wraps. Vacuum freeze marks ancient tuples as "frozen" — visible to all transactions, no xid check needed. As long as autovacuum keeps up with freezing, wraparound never matters.

When autovacuum falls behind (a heavy-write workload outruns the freezer): the database approaches the xid horizon, then stops accepting writes as a safety measure (ERROR: database is not accepting commands to avoid wraparound data loss). The famous outages:

  • Sentry (2015): xid wraparound in PostgreSQL, database read-only for hours.
  • Mailchimp (2019): similar incident, manual freeze required.
  • Joyent (2014): SmartOS-on-Solaris filesystem corruption + xid wraparound.

Monitoring: SELECT datname, age(datfrozenxid) FROM pg_database — anything over ~1 billion is concerning, over 2 billion is critical.

Monitoring autovacuum.

  • pg_stat_user_tables.n_dead_tup — current dead tuple count per table.
  • pg_stat_user_tables.last_autovacuum — when autovacuum last ran on this table.
  • pg_stat_progress_vacuum — live progress of currently running vacuum.

Tuning. For hot tables (high update rate), lower autovacuum_vacuum_scale_factor (e.g., 0.05 = 5% dead triggers vacuum) and increase autovacuum_max_workers (default 3 — too few for clusters with many hot tables). Increase autovacuum_vacuum_cost_limit to make autovacuum run faster (it self-throttles by default to avoid I/O hogging).

12.6 Auto-increment vs UUID primary key

The primary key choice matters for B+ tree insert locality.

Sequential PK (auto-increment / serial). All new inserts hit the right edge of the B+ tree. The rightmost leaf page is always in the buffer pool (recently touched). Inserts are basically: append to right edge, occasionally split, advance. CPU-friendly, buffer-pool-friendly. But for sharded systems, sequential IDs concentrate inserts on whichever shard owns the high range — the "hot last shard" problem if you're not careful.

UUIDv4 PK. UUIDs are uniformly random 128-bit values. Each insert lands on a different leaf page across the entire tree. Bad for buffer pool locality — every insert dirties a page that may not be in cache. The B+ tree fans out evenly, but with cold I/O on inserts. Concrete impact: 30%+ slower on insert-heavy workloads.

UUIDv7 / ULID. Time-ordered prefix (48-bit timestamp) + random bits. New inserts go to roughly the right edge of the tree because the timestamp prefix advances monotonically. Restores B+ tree locality while keeping global uniqueness and unpredictability.

Concrete benchmarks (same hardware, same schema, 100M-row table):

   Workload: 100k inserts/sec sustained, mixed with point lookups.

   PK type        Sustained insert rate    Notes
   ──────────────────────────────────────────────────────────────────
   Auto-increment 100k/sec                 right-edge hot, predictable
   UUIDv4          30k/sec                 buffer pool thrashes; many cold reads
   UUIDv7          80k/sec                 close to auto-increment, less skew

The right pattern in 2026. For high-insert tables, prefer auto-increment for PK and a separate UUID/UUIDv7 column for external identifiers. Or use UUIDv7 as PK directly — close enough to sequential for B+ tree happiness, globally unique for distributed systems. UUIDv4 only when the predictability of UUIDv7's timestamp prefix is a security concern (rare).

12.7 Query planner gotchas

The cost-based optimizer makes assumptions; when those assumptions are wrong, it picks bad plans.

Skewed data fools the planner. "There are 5 customers; customer_id=42 has 90% of rows."

   SELECT * FROM orders WHERE customer_id = $1;

   Planner uses statistics from pg_statistic:
   - n_distinct = 5 (5 customers)
   - Assumes uniform distribution: each customer ≈ 20% of rows.

   For customer_id=42 (actually 90% of rows): planner expects 20%, 
   picks index scan with bookmark lookup — but should have picked seq scan.
   For other customers (each ≈ 2.5% of rows): planner expects 20%, 
   picks seq scan — but should have used index. 

PostgreSQL's CREATE STATISTICS (10+) lets you create extended statistics — multivariate distribution, dependencies, most-common-values lists for specific columns. Refresh via ANALYZE. For very skewed data, plan hints (via pg_hint_plan extension) or stored plans (MySQL's optimizer hints in comments) lock the planner's choice.

EXPLAIN ANALYZE reads bottom-up. The deepest node executes first. Cost numbers are estimates — they're not real seconds, they're abstract units. The actual time=… values are real, in milliseconds. Always compare estimated rows vs actual rows; a 100x mismatch is a sign the planner has bad statistics.

Run ANALYZE after bulk data changes. Autovacuum runs ANALYZE periodically but lags large changes. After loading a million rows, ANALYZE the_table immediately — otherwise the next query may plan as if the table is empty.

Index-only scans depend on visibility map freshness. PG's visibility map tracks which heap pages have all-visible tuples (no transaction would need to look at the heap to decide visibility). An index-only scan can skip the heap lookup if the visibility map says all-visible. Lagging autovacuum → stale visibility map → forced heap lookups → slower than expected. A table that recently had heavy writes has cold visibility-map bits; index-only scans degrade to index-then-heap until autovacuum updates the map.

The synthesis: the planner is right 95% of the time; the other 5% is where you spend your time. Statistics drift, skewed data, and stale visibility-map bits cause more "why is this query slow today?" tickets than any other category.


§13. Decision Matrix

Dimension OLTP SQL (sharded MySQL/PG) Wide-column NoSQL (Cassandra) Document store (MongoDB) NewSQL (Spanner/Cockroach) Object storage (S3/GCS)
Multi-row ACID transactions Yes (within shard) No (LWT only, slow) Yes (since 4.0, slower) Yes (cross-shard native) No
Cross-shard atomicity Build with 2PC No (sagas only) Limited Native No
Strong consistency default Yes Tunable; default eventual Tunable Yes Per-object eventual
Schema enforcement Strong Per-keyspace, weak Optional Strong None
Peak writes/sec/shard 20–30k 100k+ 10–30k 5–10k (consensus cost) High (per-prefix)
p99 read predictability High Lower (compaction) Medium Medium Variable
Range scan Excellent Good (partition-bounded) Good within document Good Per-prefix only
Geographic distribution Hard (custom) Native (DC-aware quorums) Replica sets (basic) Native (consensus, TrueTime) Native (multi-region)
Operational complexity Medium High (anti-patterns kill) Medium Lower (managed) Very low
Cost Low (commodity) Low Medium High Very low for cold
Typical fit Payments, profiles, inventory Time-series, fanout, IoT Per-tenant SaaS, content Geo-distributed OLTP Archive, blobs

When to pick which.

  • Sharded OLTP SQL when you need ACID across multiple rows, write rate is thousands to millions/sec, and you can shard by a clean key. Default for payments, profiles, inventory, structured business data.
  • Wide-column NoSQL when write rate exceeds millions/sec and access is partition-keyed (time-series, event logs, IoT telemetry). Per-key consistency at best.
  • Document store when data is naturally document-shaped and per-tenant variable (CMS, freeform user data, prototype). Single-doc txns are the unit; multi-doc supported but slower.
  • NewSQL when you need cross-shard transactions and don't want to build the 2PC coordinator yourself. Higher commit latency and cost. Right pick when you'd otherwise build a custom transaction coordinator on sharded MySQL.
  • Object storage for blobs, archives, snapshots — anywhere access is "fetch by key." Never as primary OLTP.

The synthesis: every adjacent category exists because OLTP SQL is bad at one specific axis (geographic distribution, write throughput per partition, schema flexibility, archival cost). Picking right means knowing which axis matters.


Five applications, each stressing a different facet of the design space.

Payments ledger (Alipay, Stripe, PayPal, Square). Stresses conservation invariants (sum of balances = sum of credits − debits), RPO=0, per-account total ordering, idempotency, cross-shard money transfer. Fit: sharded MySQL InnoDB with semi-sync cross-AZ replication; custom 2PC coordinator on XA transactions; idempotency rows colocated in the same transaction. Alipay's OceanBase is the same architecture with consensus replication. Defining choice: per-row strong consistency — every debit reads SELECT balance FOR UPDATE, takes a row lock, applies. No eventual consistency on the synchronous path. Anti-pattern: Cassandra — without LWT you lose conservation; with LWT you've inverted into a worse MySQL.

Social profile data (LinkedIn Espresso, Facebook TAO). Stresses extreme read fanout (one profile read by millions for feed rendering), modest per-user write rate, clean partitioning by user_id, social graph edges as data; consistency on edges matters less than on profile fields. Fit: sharded MySQL primary per user with massive read replicas and aggressive caching (TAO at Facebook is the cache layer in front of MySQL). Defining choice: read-scaling via replicas and caches — small write rate, enormous read rate, cache hit ratio drives everything. Anti-pattern: document store as primary. Profile is small structured data with strict access patterns; document flexibility is overhead.

E-commerce inventory (Amazon, Shopify, eBay). Stresses hot SKUs during flash sales (single-row contention on stock counters), cross-warehouse atomicity, reservation flow (hold stock for cart before commit), reconciliation against physical counts. Fit: sharded relational primary; write-sharding for hot SKUs during sales; TTL'd reservation holds; periodic reconciliation. Amazon DynamoDB handles parts with single-partition transactions; Shopify uses sharded MySQL via Vitess (80k+ checkouts/min Black Friday peak). Anti-pattern: single-primary OLTP at peak sale traffic — Black Friday is the inflection where per-shard capacity planning must precede the sale.

Gaming leaderboards (Riot, Discord, real-time PVP). Stresses high write rate per match (millions of matches/day), low read latency for "top 100 in my region," ranged scans by score, regional partitioning. Fit: hybrid — a transactional store (PostgreSQL, MySQL) for source of truth (match results, player stats) plus a sorted set (Redis ZSET) for the live leaderboard view; CDC keeps them aligned. Defining choice: separating the durable record from the query view. Anti-pattern: making OLTP serve top-100 rank at millions of QPS — ORDER BY score LIMIT 100 on a billion-row table scans the index; Redis ZSETs are built for this.

SaaS multi-tenant data (Notion, Slack, Figma). Stresses per-tenant isolation, uneven tenant size (one customer 100x median), schema evolution as the product evolves, version history (Notion's block-level edits, Figma's frame-level edits), real-time collaboration. Fit: sharded PostgreSQL or MySQL with tenant_id as shard key; custom rebalancing for outsized tenants (largest get their own shard). Notion's architecture (2021+) is the canonical example; Slack uses Vitess on MySQL with team_id. Defining choice: tenant_id as primary partition key with affordances for skew. Anti-pattern: one database for all tenants with tenant_id as a column — works to ~100 tenants of similar size; fails when the largest outgrows one shard.


§15. Real-World Implementations with Numbers

Named systems shipping this technology at scale, across varied use cases.

System Use case Scale numbers Variant + pattern
Alipay OceanBase Payments ledger 583,000 payments/sec peak; 50 PB total; p99 <100ms MySQL-compatible distributed SQL with Paxos per shard + custom 2PC
LinkedIn Espresso Profile, posts, connections PB-scale; ~10M QPS aggregate; thousands of shards Sharded MySQL with Helix routing; Brooklin CDC to Kafka
YouTube Vitess (MySQL) Video metadata Millions of QPS; PB-scale Vitess shard router over MySQL; online resharding workflow
Stripe Payments processing Estimated 5–10k payments/sec peak; idempotency at scale Historically MongoDB; migrated portions to sharded PostgreSQL
Pinterest MySQL Pins and boards 600k pin-writes/sec peak; 4096 logical shards Hand-rolled sharded MySQL; PIN_ID partition key
Notion Pages, blocks, documents ~50k writes/sec; 32+ logical PG shards (post-2021 resharding) Sharded PostgreSQL; tenant_id partition; custom rebalancing
Facebook TAO + UDB Social graph and profile Billions of reads/sec; PB-scale Sharded MySQL primary + TAO cache layer; aggressive read fanout
Amazon DynamoDB E-commerce + shopping cart Millions of QPS; planet-scale LSM-based, partition-by-key, single-table transactions added later
Discord (Cassandra → ScyllaDB) Messaging 120M messages/day on ScyllaDB; LSM-based LSM wide-column for time-series-shaped messaging data
Shopify (MySQL/Vitess) Multi-tenant e-commerce 80k+ checkouts/min Black Friday peak; sharded by shop_id Sharded MySQL via Vitess; per-shop podding
GitHub MySQL Repositories and metadata ~30k QPS sustained; gh-ost for online migrations Sharded MySQL; in-house online-schema-change tooling
Slack (Vitess on MySQL) Workspaces and messages Millions of QPS; thousands of shards; team_id partition Sharded MySQL via Vitess; team_id partition key

The pattern across all of these: a B+ tree-based transactional engine (MySQL or PostgreSQL underneath, occasionally a custom variant like OceanBase) sharded by a clean partition key (account_id, user_id, tenant_id, pin_id, team_id, shop_id) with semi-sync or consensus replication for durability, CDC to Kafka for derived stores. Variations are in partition key choice, replication mode (semi-sync vs Paxos), and whether a higher-level distributed transaction layer (Vitess, custom 2PC, Spanner-style consensus) is needed for cross-shard.

This is not coincidence. It's what works at scale when you need ACID on structured data. The fact that LinkedIn, Pinterest, Notion, Shopify, GitHub, and Slack converge on the same shape — sharded MySQL or PostgreSQL with custom routing and CDC — is the strongest possible signal that this is the answer.


§16. Part 1 Summary

A transactional database is a B+ tree (or LSM variant) wearing an ACID contract — its storage engine guarantees per-shard atomicity, isolation, and durability via MVCC and a write-ahead log fsync, but everything beyond a single shard (cross-shard transactions, geographic distribution, idempotent retries, schema evolution under load) is a layer the system designer must build above it. The reason sharded MySQL or PostgreSQL keeps winning at scale — at Alipay's 583k payments/sec, at LinkedIn's PB of profile data, at Notion's millions of documents, at Pinterest's billions of pins — is not that the storage engine is magic; it's that B+ tree + MVCC + WAL is the cheapest known way to give a developer the "this either happened or it didn't" contract on structured mutable data, and once you have that contract, every other concern in the system (caches, search, analytics, archives, derived views) reduces to "keep this thing fed from the WAL and rebuild on demand."


Part 2: Analytical / Columnar Databases

§17. What Analytical / Columnar Databases Are

An analytical database is a stateful storage system whose contract is built around scan throughput — aggregations, filters, group-bys, joins over billions to trillions of rows answered in milliseconds to seconds — applied to immutable or append-mostly structured data. It is the system underneath anything where the question is "how much," "how many," "trending," or "what changed": revenue dashboards, cohort analyses, ad attribution, A/B test readouts, anomaly detection, real-time monitoring. The defining property is "promises a fast answer over a lot of data, even if the data is slightly stale."

Analytical databases sit at one corner of a four-way design space:

  • Real-time OLAP (RTOLAP) — Apache Pinot, Apache Druid, ClickHouse, StarRocks, Apache Doris. Sub-second p99 scan queries, denormalized star schemas, append-mostly streaming ingestion (Kafka-fed), pre-aggregated materialized views. Powers user-facing analytics: LinkedIn's "Who viewed your profile," Uber's surge map, Slack's search analytics, ad-tech's bid-time decisioning.
  • Cloud data warehouses — Snowflake, BigQuery, Redshift, Databricks SQL. Petabyte scale, separation of storage (object store) and compute (elastic clusters), strong SQL semantics, schema-on-write, T+1 to near-real-time freshness via batch or micro-batch ELT. Powers BI dashboards, ML feature engineering, finance reconciliation, regulatory reporting.
  • Lakehouse engines — Apache Iceberg, Delta Lake, Apache Hudi, atop S3/GCS/ADLS with query engines like Trino, Spark SQL, Presto. Object storage is the source of truth; metadata layer adds ACID, time travel, schema evolution. Powers ML pipelines and analytics where the same dataset must be queried by multiple engines.
  • HTAP / hybrid stores — TiDB (TiFlash), SingleStore (MemSQL), CockroachDB columnar replicas, Postgres + Citus columnar, Snowflake Unistore. Single system attempts both OLTP and OLAP via row-store primary + columnar secondary, with consistent views. Powers operational analytics, where the freshness gap to a separate OLAP system would be unacceptable.

Part 2 focuses on real-time OLAP and cloud data warehouses because those represent the two dominant operating points: low-latency user-facing analytics on the one hand, and large-scale batch analytics on the other. Lakehouse and HTAP variants reuse much of the same machinery (columnar layout, vectorized execution, predicate pushdown) with different durability and elasticity tradeoffs; we contrast them in §19.

What analytical databases are NOT good for:

  • Point updates / single-row mutations. A columnar layout means a single-row update touches every column file. Use OLTP for mutable state; let OLAP read a CDC stream from it.
  • Sub-millisecond point lookups. Even a well-indexed column store pays a column-stripe traversal cost. Use Redis, an OLTP primary key lookup, or a key-value cache.
  • Transactional consistency across rows. No ACID across rows; aggregations are "approximately consistent up to ingestion lag."
  • Frequent schema-by-row variance. Schemaless documents waste columnar compression. Use a document store, or denormalize/flatten before ingest.
  • Tiny data. Under ~10M rows, the overhead of columnar planners and shuffle pipelines costs more than a plain Postgres index. Pick OLAP when scale forces it, not before.

Mental model: an analytical database is a derived store kept in sync with the system of record (an OLTP database, an event log, or both) via CDC, Kafka, or batch ELT, and shaped to answer scan-heavy questions cheaply. Get the OLTP right first; OLAP can be rebuilt from the WAL or the event log. Get OLAP wrong (skewed partitions, missing materialized views, late-arriving data) and dashboards lie, but the truth survives in the OLTP primary.


§18. Inherent Guarantees

What the technology provides by design, and what must still be layered above it.

Provided by design

  • Scan throughput. Columnar layout + compression + vectorized execution scan billions of rows per second per node. ClickHouse benchmarks 1B+ rows/sec scan on a single 32-core box; Pinot serves 100k+ qps with p99 < 100ms on denormalized star schemas.
  • Compression. 5–100× smaller on disk than equivalent row-store data. Dictionary encoding for low-cardinality strings, delta + bit-packing for monotonically-increasing keys, RLE (run-length encoding) for sorted dimensions, LZ4/ZSTD on top.
  • Predicate pushdown. Query engine pushes WHERE clauses down to the storage layer; storage skips entire blocks via zone maps (per-block min/max) and bloom filters.
  • Append durability. Once data is committed to the warehouse (S3 multipart upload, segment merge, snapshot commit), it survives. Cloud warehouses inherit S3's 11-nines durability.
  • Late-binding schema (varies by system). Lakehouses and BigQuery handle schema evolution by adding columns without rewriting old files; Pinot/Druid need explicit reload.

Must be layered above

  • Freshness. No system gives instant analytics on the latest OLTP write for free. RTOLAP gets to seconds via Kafka. Warehouses sit at minutes-to-hours via micro-batch. HTAP is the only "tens of milliseconds" option.
  • Consistency with OLTP. A query against the warehouse can see "different truth" from the OLTP primary if the CDC pipeline is lagging or partitioned. The application must know which freshness regime it is reading.
  • Cost control. A misindexed scan in Snowflake or BigQuery can cost hundreds of dollars per query. Cost monitoring, query budgets, and per-tenant quotas are the operator's job.
  • Skew avoidance. A bad partition key (everything hashing to one shard) destroys throughput. The schema and ingest pipeline choose the partition key; the engine cannot save you.
  • Backfill machinery. Re-deriving a year of analytics from the event log when a definition changes is its own pipeline; the database does not version your aggregation logic.
  • Multi-tenant isolation. A single noisy customer's scan can starve everyone else. The warehouse offers resource pools and per-warehouse compute; the operator wires them up.

Synthesis: the analytical database guarantees scan-throughput on what's already been ingested. Freshness, definitions, cost, and isolation are all the system designer's problem.


§19. The Design Space

Variants differ along several orthogonal axes. They interact; you don't pick freely.

Storage layout: pure columnar vs hybrid row-groups

A pure columnar layout stores each column in its own file (or stripe), sorted by some key. Reads of a single column touch only that column's bytes — minimal I/O for narrow aggregations like SUM(amount). ClickHouse, Pinot, BigQuery Capacitor.

A hybrid row-group layout (PAX, Parquet, ORC) chunks rows into row groups of 100k–1M rows, then within each row group stores data column-wise. This lets a single S3 file efficiently support both wide (whole-row) and narrow (single-column) scans, at the cost of slightly worse compression than pure columnar. Snowflake micro-partitions, Iceberg + Parquet, Delta Lake.

Tradeoff: pure columnar wins on narrow analytical scans; hybrid wins on flexibility across query shapes and on object-storage compatibility. Most cloud warehouses pick hybrid.

Storage vs compute coupling

Shared-nothing (ClickHouse, Pinot, Druid historicals, classic Redshift) puts data on local disks of the compute nodes. Low query latency, simple architecture, but scaling requires rebalancing — slow and operationally expensive.

Shared storage / disaggregated compute (Snowflake, BigQuery, Redshift RA3, Databricks) puts data in object storage (S3, GCS, Azure Blob); compute clusters are elastic and stateless. Scaling is instant — spin up another warehouse — but query latency adds object-storage GET cost (50–200ms cold) absorbed by an SSD cache layer.

Tradeoff: shared-nothing wins for sub-second p99 user-facing analytics; shared-storage wins for elastic large-scale batch and multi-tenant cost.

Ingestion: batch vs micro-batch vs streaming

Batch ELT — Airflow / dbt / Fivetran loads new data hourly or daily. Cheapest, simplest, ~hours of freshness. Default for cloud warehouses.

Micro-batch — small batches (1–10 min) loaded continuously. Used by Snowpipe, Databricks Auto Loader. Few-minute freshness at modest cost.

Streaming — Kafka-driven, segment-rolling ingestion (Pinot real-time servers, Druid Kafka indexing, ClickHouse Kafka engine). Seconds-of-freshness; cost is higher and operational complexity rises (segment compaction, schema mismatches, dead-letter handling).

Indexing: zone maps, bloom filters, inverted indexes, materialized views

Beyond the columnar layout itself, analytical engines accelerate queries via:

  • Zone maps — per-block min/max values, used to skip entire blocks for selective predicates. Universal across columnar engines.
  • Bloom filters — per-column probabilistic membership tests, used for high-cardinality equality predicates (WHERE user_id = X). Pinot, Druid, ClickHouse.
  • Inverted indexes — for text search columns inside an OLAP store (Pinot text indexes, Druid spatial/json indexes).
  • Bitmap indexes — for low-cardinality dimensions used in many group-bys.
  • Materialized views / pre-aggregations — precomputed rollups maintained on ingest. Pinot's StarTree index is the canonical implementation; Druid has rollup at ingest; BigQuery has materialized views; ClickHouse has materialized views and projections.

Schema model: star schema vs flat denormalized vs wide-table

Star schema — fact table + dimension tables joined at query time. Classic warehouse pattern (Kimball). Works because dimensions are small enough to broadcast.

Flat denormalized — every fact carries its dimensions inline. RTOLAP default (Pinot, Druid) because joins are slow under sub-second latency budgets. Burns disk; saves query time.

Wide table — a single table with hundreds or thousands of columns per logical entity (Snowflake Variant, ClickHouse with many columns). Trades schema discipline for query flexibility.

Concurrency: queries/sec ceiling

Real-time OLAP serves 1k–1M qps (Pinot, Druid). Cloud warehouses serve dozens to hundreds of concurrent queries per cluster — they assume each query is large and elastic compute is the lever. HTAP systems sit in between.

Comparison table

Axis Pinot / Druid (RTOLAP) ClickHouse Snowflake / BigQuery Lakehouse (Iceberg + Trino)
Layout Pure columnar segments Pure columnar parts Hybrid micro-partitions Parquet/ORC row groups
Storage Local SSD + deep store Local SSD Object storage Object storage
Compute Coupled (broker + server) Coupled (clusters) Disaggregated Disaggregated (Trino)
Freshness seconds seconds–minutes minutes–hours minutes–hours
Latency p99 <100ms <1s seconds seconds
QPS 100k+ 1k+ 10s 10s
Best for user-facing analytics log analytics, OLAP-on-prem BI, batch open lakehouse

§20. Underlying Data Structure: Column-Oriented Storage

The byte-level mechanics worth knowing.

20.1 Column-stripe layout: row-major vs column-major

A row store (InnoDB, PostgreSQL) keeps each row's columns contiguous: row 1's (user_id, event_type, ts, amount) are adjacent bytes, then row 2's, etc. Good for "give me this row"; bad for "sum one column over a billion rows" because the engine reads every byte of every row to touch one column's worth of data.

A column store keeps each column contiguous instead:

Column-major (Parquet row group, simplified):
┌──────────────────────────────────────────────────────────┐
│ user_id:   [val_1, val_2, val_3, …, val_N]   contiguous  │
│ event_type:[val_1, val_2, val_3, …, val_N]   contiguous  │
│ ts:        [val_1, val_2, val_3, …, val_N]   contiguous  │
│ amount:    [val_1, val_2, val_3, …, val_N]   contiguous  │
└──────────────────────────────────────────────────────────┘

Concrete numbers for a 100M-row table:

  • Row store: 100M rows × 24 bytes ≈ 2.4 GB. SELECT SUM(amount) reads every page.
  • Column store (ClickHouse MergeTree): four files. user_id.bin ~800 MB, event_type.bin ~50 MB (dictionary-encoded), ts.bin ~200 MB (delta + bit-packed), amount.bin ~200 MB. SUM(amount) reads only amount.bin — 12× I/O reduction before compression even kicks in.

The other consequence: per-column locality is brutal. All country_code values are one of ~200 strings; all ts values monotonically increase. Compression exploits this.

20.2 Compression codec stack

A modern columnar engine doesn't use one compressor — it composes lightweight codecs tailored to the column's data shape, then applies a general entropy coder on top.

  • Dictionary encoding. For low-cardinality columns (country_code, event_type, status), map each distinct value to a small integer ID. A country_code column of 1B × 8 bytes = 8 GB becomes 1B × 1 byte + dictionary ≈ 1 GB. 8× reduction.
  • Run-length encoding (RLE). For sorted columns, store (value, count) pairs. After sorting by country_code, 1B rows compress to ~1k runs ≈ 16 KB.
  • Bit-packing. When the dictionary fits in K bits (256 entries → 8 bits, 64 → 6 bits), pack IDs at bit-level. A 5-bit dictionary across 1B rows saves 3 bits/row = ~375 MB per column.
  • Delta encoding. For monotonic numerics (timestamps, sequential IDs), store (start, delta_1, delta_2, …). Second-resolution event timestamps compress to a few bits per delta.
  • Delta-of-delta. Two-level delta. Used in Gorilla (Facebook), generalized in Prometheus / M3DB / InfluxDB IOx. For a regular-cadence metric ("CPU sampled every 10s"), most deltas are exactly 10; delta-of-delta is 0. Compresses a 64-bit timestamp to ~1 bit on average.
  • Gorilla encoding for floats. XOR each float64 with the previous, store leading-zero count + significant bytes. For stationary or slow-changing metrics (memory usage, throughput), consecutive samples differ in only the last few bytes → 10–20× compression.
  • Zstandard / LZ4 / Snappy on top. General entropy coder over the codec output. Zstd level 3 is the modern default — ~500 MB/sec compress, ~2 GB/sec decompress, 2–4× additional reduction.

Cumulative effect on a real production fact table (~50 columns, ~256 B/row uncompressed): per-column codecs give ~5×; Zstd on top gives ~2×; total ~10×. A 10 TB raw fact table on S3 is ~1 TB in Parquet. The country_code path end-to-end:

Raw VARCHAR    8 GB     8-byte strings
Dictionary →   1 GB     8-bit IDs + small dictionary
Bit-pack →     125 MB   bit-packed to actual bit-width
Zstd →         ~15 MB   entropy coding on the bit-packed bytes
              ────
              ~530× reduction on a well-skewed column

20.3 Granules, zone maps, and the sparse primary key

A columnar engine doesn't index every row. It indexes granules — fixed-size chunks of consecutive rows. Typical sizes: 8192 rows in ClickHouse, 1024 in Pinot, ~1M rows per row-group in Parquet.

Each granule carries per-column metadata:

  • Min / max of the column within the granule (zone map). A WHERE event_date BETWEEN … query checks each granule's (min, max); non-intersecting granules are skipped entirely.
  • Null count.
  • Bloom filter (optional). For high-cardinality point-equality predicates (WHERE user_id = 42), a per-granule bloom filter rules out granules with no matching key.
  • Histogram (optional). Snowflake keeps approximate distributions per micro-partition for the optimizer.
ClickHouse MergeTree, simplified granule layout:

┌────────────────────────────────────────────────────────────┐
│ Part (~10–100 GB, one logical chunk on disk)               │
│   ┌──────────────────────────────────────────────────────┐ │
│   │ Granule 0:                                           │ │
│   │   col1.bin bytes [0..L1]   col2.bin bytes [0..L2]    │ │
│   │   Mark file: col1: min=10, max=99, null=0            │ │
│   │              col2: min='2026-05-01', max='2026-05-01'│ │
│   ├──────────────────────────────────────────────────────┤ │
│   │ Granule 1: …                                         │ │
│   └──────────────────────────────────────────────────────┘ │
│                                                            │
│   Primary key index (sparse): one entry per granule        │
└────────────────────────────────────────────────────────────┘

The primary key index in a columnar OLAP engine is fundamentally different from an OLTP B+ tree: it is sparse — one entry per granule, not per row. An 8192-row granule yields a PK index ~8000× smaller. Point lookups find one granule (~8000 rows) in <1 ms and must then scan inside. This is why OLAP is the wrong tool for sub-millisecond row lookups — the index granularity is wrong by design.

For a selective predicate against zone maps: SELECT SUM(amount) WHERE ts BETWEEN '2026-05-01' AND '2026-05-07' against a 100M-row table with 65k-row granules reads ~1500 granule footers, finds ~7 matching granules, scans ~500k rows. 200× reduction.

20.4 Vectorized execution and SIMD

Tuple-at-a-time (Volcano) execution processes one row at a time through the operator tree. Each row pays virtual-call overhead, branch mispredictions, and cache misses at every operator boundary. Vectorized execution processes a batch (typically 1024 or 4096 values) through each operator at once, turning the inner loop into a tight scalar or SIMD-vector loop:

// Tuple-at-a-time
while (next_row(row)) {
  if (row->price > 100) emit(row);
}
// Throughput: ~10–50 M rows/sec/core

// Vectorized
for (int i = 0; i < 1024; i++) {
  selection[i] = (price[i] > 100);
}
emit(selection);
// AVX-512: 1024-wide compare + bitmask in <100 nanoseconds
// Throughput: ~1–5 GB/sec/core on integer columns

ClickHouse, DuckDB, Databricks Photon, BigQuery's Capacitor engine all use AVX-512 / AVX2 intensively. Aggregations (SUM, COUNT, MIN, MAX), filters, arithmetic, and group-by hashing all parallelize. Vectorized columnar execution scans 1–10 GB/sec/core — the whole "OLAP is fast" story rests on this. Same hardware, 10–100× faster than a tuple-at-a-time interpreter.

20.5 Parquet file structure

Parquet is the dominant on-disk columnar format outside the proprietary warehouses. Layout:

Parquet file (typical 128–1024 MB):

┌─────────────────────────────────────────────────────────────┐
│ Magic: "PAR1"                                               │
├─────────────────────────────────────────────────────────────┤
│ Row Group 0 (~128 MB uncompressed, ~1M rows)                │
│   Column chunk for col1                                     │
│     Page 0: dictionary page                                 │
│     Page 1: data page                                       │
│     …                                                       │
│   Column chunk for col2 → …                                 │
│   …                                                         │
├─────────────────────────────────────────────────────────────┤
│ Row Group 1, Row Group 2, …                                 │
├─────────────────────────────────────────────────────────────┤
│ Footer:                                                     │
│   Schema, row groups[], column metadata[], statistics       │
│   (min, max, null_count, byte_offset, sizes, bloom offset)  │
│ Footer length (4 bytes) + magic "PAR1"                      │
└─────────────────────────────────────────────────────────────┘

Key properties: self-describing (footer holds full schema), row groups are independent (skip via footer stats, parallelize across files), page is the I/O unit (~1 MB), and the footer-at-end convention costs three S3 round trips before data fetches start — a real performance gotcha for many-small-file workloads.

20.6 Iceberg manifest tree — table semantics over Parquet

Parquet alone is "files in S3." To build a real table with INSERT / snapshot isolation / schema evolution / time travel, you need a metadata layer above the files. Iceberg's solution is a multi-level manifest tree in object storage, with atomic pointer-swap commits:

table_root/
├── metadata/
│   ├── v42.metadata.json     ← schema, snapshots[], partition spec
│   ├── snap-12345-mlist.avro ← manifest list (per-manifest partition bounds)
│   └── manifest-001.avro     ← manifest file (per-data-file column stats)
└── data/
    └── dt=2026-05-01/part-0001.parquet

Chain: table metadata → snapshot → manifest list → manifest files → Parquet data files.

This is a four-level zone-map hierarchy: partition → file → row-group → page. Each level prunes ~10–100×. A 1 PB table with a tight predicate often scans <100 MB. Snapshot isolation comes from atomic pointer swaps on version-hint.text (or a catalog row); time travel comes from retaining old metadata.json files; schema evolution is metadata-only (each snapshot carries a schema version).

Delta Lake and Hudi solve the same problem with different formats (a _delta_log/ of JSON commit files; a .hoodie/ timeline directory). Iceberg's manifest tree is the dominant choice in 2026 for breadth of engine support and operational simplicity at PB+ scale.

20.7 Materialized views and StarTree

Many analytical queries are rollups: SELECT country, day, SUM(amount) FROM events GROUP BY country, day. If the cardinality of (country, day) is small but the underlying data is huge, pre-aggregating at ingest turns every query into a constant-time lookup.

  • ClickHouse materialized views — a trigger on the source table re-aggregates into a target table per-batch.
  • BigQuery materialized views — maintained automatically; the query planner rewrites queries to use them when applicable.
  • Pinot StarTree index — a multi-level pre-aggregation tree supporting arbitrary subsets of dimensions and metrics. The query plan walks to a leaf cell and reads the pre-aggregated value, never scanning raw data. Powers LinkedIn user-facing analytics at p99 < 100 ms.

§21. Hard Problems Inherent to Analytical Databases

21.1 Skew and hot partitions

A bad partition key — say country when 60% of traffic is from one country, or user_id modulo N when one user is 10× larger than the median — pins one node at 100% while others idle. Mitigations: use composite keys (country + user_id_bucket), hash-then-modulo on a salted field, or pick a key with known uniform distribution (e.g. a random nanosecond-resolution timestamp combined with sequence).

21.2 Late-arriving and out-of-order data

A click event timestamped 10:00 may arrive at the warehouse at 10:30 because the user's network was slow. If the 10:00 segment has already been sealed and pre-aggregated, the late row either drops or forces a re-aggregation. Pinot, Druid, and lakehouse engines handle this with mutable hot segments, late-arrival buffers, or merge-on-read patterns. Cost: increased complexity and lag between "data present" and "query reflects it."

21.3 High cardinality and join blow-up

A user_id column with 1B distinct values resists dictionary encoding (the dictionary becomes as large as the column). A join between two 1B-row tables on user_id either shuffles 32 GB of keys across the cluster or requires a broadcast-join precondition. Mitigations: pre-join at ingest (denormalize), shard both tables on the join key, or use approximate techniques (HyperLogLog for distinct counts).

21.4 Schema evolution

Adding a column to a 10 TB Parquet table is easy if the format supports column appends (Iceberg, Delta) — the old files are read as NULL for the new column. Renaming or retyping a column requires rewriting or a logical-name mapping (Iceberg's schema-evolution semantics handle this). Dropping a column needs a rewrite if you care about storage cost. Compared to OLTP, schema evolution is cheaper but slower (gigabytes-to-terabytes to rewrite).

21.5 Real-time freshness vs query throughput

A real-time OLAP cluster receiving 100k events/sec from Kafka must compact tiny streaming segments into larger ones in the background. If compaction falls behind, hot-segment count grows, and per-query overhead (one merge step per segment) explodes. Pinot and Druid expose segment age / count metrics; SREs alert on these.

21.6 Cost runaway in the cloud

A single full-scan SELECT * in BigQuery on a 10 TB table costs ~$50. A daily dashboard that scans the whole table costs $1500/month. Untuned BI tools that do this for every user dashboard render are a known cost-explosion pattern. Mitigations: clustered tables, materialized views, query-cost monitoring, per-user query budgets, BI tools that respect partition-prune predicates.

21.7 Inconsistency with OLTP

A user updates their profile in the OLTP database at 10:00:00. The CDC pipeline lands the change in the warehouse at 10:00:30. A dashboard refreshed at 10:00:15 shows the old value. For BI this is fine; for an operational dashboard that triggers actions (e.g. fraud detection), this is a bug. Mitigations: route operational queries to the OLTP database, route analytical queries to the warehouse, and document the freshness regime for each surface.


§22. Real-World Implementations with Numbers

The systems and their operating points.

Apache Pinot (LinkedIn)

  • Origin: built at LinkedIn for "Who viewed your profile" and ad analytics.
  • Scale: 100k+ qps, sub-100ms p99, petabytes ingested per day.
  • Architecture: brokers route queries; servers hold segments; controllers manage segment assignments; minions handle background tasks. Real-time servers consume Kafka; offline servers serve immutable segments from deep store (HDFS/S3).
  • Standout feature: StarTree index for arbitrary pre-aggregations at query time.
  • Deployed at: LinkedIn, Uber, Walmart, Stripe.

Apache Druid

  • Origin: built at Metamarkets (now Snap) for ad analytics.
  • Scale: hundreds of thousands of qps, sub-second p99, real-time + batch hybrid.
  • Architecture: similar to Pinot — brokers, historicals (immutable), middle-managers (real-time), coordinators (segment assignment).
  • Standout feature: rollup at ingest (similar to materialized views, applied at write time).
  • Deployed at: Netflix, Airbnb, Wikimedia, eBay.

ClickHouse

  • Origin: built at Yandex for Metrica (web analytics, 30B events/day).
  • Scale: 1B rows/sec scan on a single machine; horizontal scale to PB.
  • Architecture: shared-nothing clusters; MergeTree storage engine; coordinated via ZooKeeper or ClickHouse Keeper.
  • Standout features: extreme single-node performance, materialized views with arbitrary SQL, native Kafka and S3 engines.
  • Deployed at: Cloudflare (HTTP analytics, ~25 PB), Yandex, Tinybird, Plausible.

Snowflake

  • Origin: built greenfield in the cloud, ~2014.
  • Scale: petabytes per customer, hundreds of concurrent users per warehouse, T+1 to near-real-time via Snowpipe.
  • Architecture: storage in S3 (micro-partitions); virtual warehouses are elastic compute clusters; metadata service tracks partitions; result cache eliminates duplicate work.
  • Standout features: zero-copy cloning, time travel (query historic state), separation of storage and compute.
  • Deployed at: most Fortune 500 companies for BI, finance, regulatory.

BigQuery

  • Origin: built at Google atop Dremel (the research paper Snowflake also drew from).
  • Scale: exabyte tables; pay-per-query model; ~10–100 GB/sec scan per query slot.
  • Architecture: storage in Colossus (Google's distributed file system); compute is a serverless slot pool; Dremel-based query engine with vectorized execution.
  • Standout features: BigQuery ML, BI Engine cache, completely serverless.
  • Deployed at: Google internal, Spotify, The New York Times, HSBC.

Lakehouse (Iceberg + Trino + Parquet)

  • Origin: Iceberg at Netflix, Delta Lake at Databricks, Hudi at Uber.
  • Scale: exabyte tables in S3; query latency seconds-to-minutes; multiple engines can query the same dataset.
  • Architecture: Parquet files in object storage; metadata layer tracks files, snapshots, schema versions; Trino / Spark / Flink can all query through the metadata layer.
  • Standout features: open format, multi-engine, ACID on object storage, time travel, schema evolution.
  • Deployed at: Netflix, Apple, Airbnb, Adobe.

LinkedIn architecture in context

The end-to-end shape at LinkedIn (illustrative of the broader pattern):

[Espresso (OLTP, document store)] --[Kafka CDC]-->
   ├──> [Pinot (RTOLAP)]  -- user-facing analytics, sub-100ms
   ├──> [Lakehouse (HDFS + Iceberg)]  -- ML feature engineering, batch BI
   └──> [Search index (Galene)]  -- text search

Each derived store is rebuilt by replaying the Kafka log; the OLTP primary is the source of truth. This is the canonical "primary + derived stores" pattern from §6 made concrete: the OLTP database holds mutable state; OLAP and search are eventually-consistent derived views shaped for different access patterns.


§23. Summary

Databases come in two flavors that complement, not compete. The transactional flavor is a B+ tree (or LSM) wearing an ACID contract — its job is to guarantee that small mutations to mutable, structured state either happened or didn't, and to survive power loss. The analytical flavor is a columnar layout wearing a scan-throughput contract — its job is to answer aggregation queries over billions of rows in milliseconds-to-seconds, even if the data is slightly stale. A real system uses both: the OLTP primary holds the truth, ACID-protected, single-shard or sharded; an analytical store (Pinot for user-facing analytics, Snowflake or BigQuery for BI, a lakehouse for ML feature engineering) is kept in sync via CDC or Kafka and answers the questions the OLTP primary cannot afford to answer. The art is knowing which flavor each question belongs to — "what is this user's current balance" lives in the OLTP; "what is the trending purchase pattern across 200M users last week" lives in OLAP — and engineering the pipeline between them (CDC, schema evolution, freshness SLOs, idempotent backfills) with the same discipline you would apply to the primary itself.