Component class: Coordination services / metadata stores — etcd, Apache ZooKeeper, HashiCorp Consul, Google Chubby. The category of strongly-consistent, highly-available, deliberately low-throughput key-value stores that hold the control-plane state for distributed systems.
Framing: this doc is about the technology class. Coordination services occupy a distinctive niche in the storage taxonomy — they sit between transactional databases and in-memory caches but are neither. They exist because distributed systems need a single source of truth for a small set of critical questions ("who is the leader?", "where do shards live?", "what is the config?", "who holds this lock?"), and that truth must survive failures without ambiguity. Examples throughout pull from Kubernetes, Kafka, Cassandra, CockroachDB, distributed cron, feature flags, and service mesh.
§1. What coordination services ARE
A coordination service is a strongly-consistent, highly-available, distributed key-value store optimized for the storage and dissemination of control-plane metadata. Defining characteristics:
- Tiny dataset, high importance. Kilobytes to a few gigabytes. Every byte matters — a wrong answer corrupts the entire data plane.
- Low throughput, by design. Single-leader consensus means ~10,000 writes/sec ceiling. Acceptable because the consumers of coordination state read it far more often than they write it, and the rate of metadata change is bounded by cluster events (deploys, failovers, config changes), not user traffic.
- Linearizable reads and writes. A successful write is visible to every subsequent read on every replica.
- Watch primitive. Clients subscribe to keys or prefixes and receive a totally-ordered stream of changes.
- Lease / ephemeral semantics. Clients hold keys conditional on liveness — when the client dies, the key disappears.
What problem class do they solve?
Five problems across every distributed system:
- Leader election — among N replicas, exactly one holds a lease at a time.
- Service discovery — endpoints register, consumers watch.
- Distributed locking — with fencing tokens that downstream resources validate.
- Configuration distribution — write once, propagate via watch.
- Cluster membership — nodes register with leases, drop out on failure.
Where they sit in the stack
┌──────────────────────────┐
│ Application │ (user requests)
└────────────┬─────────────┘
│
┌────────────▼─────────────┐
│ Data plane │ (DBs, caches, queues)
│ millions of ops/sec │ high throughput
└────────────┬─────────────┘
│ asks: who's the leader,
│ what's the config,
│ who owns this shard?
┌────────────▼─────────────┐
│ Coordination service │ (etcd / ZK / Consul)
│ ~10k writes/s ceiling │ low throughput,
│ ~GBs of metadata │ strong consistency
└──────────────────────────┘
Distinguish from adjacent categories
- Transactional database (MySQL, PostgreSQL): higher write throughput, larger data, rich queries — but no native watches, leases, or coordination primitives.
- Wide-column NoSQL (Cassandra, HBase): eventually consistent by default, partitioned for terabytes of user data, no consensus on writes.
- In-memory KV (Redis, Memcached): sub-millisecond reads, but no real consensus — Redis primary-replica replication is async, so the post-failover state is not a consistent prefix of the pre-failover state. Acceptable as a cache, dangerous as a coordinator.
- Object storage (S3, GCS): huge capacity, read-after-write per object only, no watches, no leases.
NOT good for
- High write throughput (>10k/sec) — quorum fsync gates every commit.
- Large data volume (>8 GB) — snapshot transfer and in-memory index pressure dominate.
- Low-latency global writes — cross-region quorum is 30-100 ms minimum.
- Rich query semantics — flat KV or hierarchical znodes only.
- User data — classic anti-pattern.
§2. Inherent guarantees
Provided by design
Linearizability. Once a write is acked, every subsequent read on any replica reflects it. Single totally-ordered history. The strongest single-object consistency model.
No split-brain under partition. Under any partition, at most one side can commit writes. The other becomes read-only or refuses service. Consensus prevents two simultaneous leaders by construction — a leader needs majority quorum, and a partition cannot produce two majorities.
Durability via majority quorum. A committed write survives any minority of replica failures and disk loss on the minority. Requires fsync before ack on a majority — durability point is the slowest fsync of the quorum.
Watch delivery in order. Subscribers receive every committed change matching their predicate, in commit order, with no gaps (until a configurable compaction horizon).
Lease semantics. Liveness is determined by the coordination service's view (lease heartbeats received), not by gossip — this is what makes locks safely fence-able.
NOT guaranteed (must be layered on)
- High throughput. Hard ceiling ~10k writes/sec per Raft group; if you need more, shard the workload.
- Multi-region availability. Run separate coordination services per region with out-of-band state replication.
- Idempotency of client operations. If the client doesn't know whether its request succeeded (network drop between commit and ack), it must retry. Application makes operations idempotent — typically via CAS (compare-and-swap) on the revision number.
- SQL-style multi-row transactions. Multi-key conditional batches (etcd
Txn, ZooKeepermulti) exist but are not full ACID across many keys. - Fast failover. Leader election takes 150-600 ms on a healthy cluster; applications must tolerate this brief unavailability.
§3. The design space
| System | Consensus | Data model | Watch model | Bundled extras | Op footprint | Famous users |
|---|---|---|---|---|---|---|
| etcd | Raft | flat KV, MVCC revisions | streamed, replayable from revision | none — pure KV | one Go binary | Kubernetes, CoreOS lineage |
| ZooKeeper | Zab | hierarchical znodes | one-shot triggers | session model, sequential znodes | JVM, separate ensemble | Kafka (legacy), HBase, Hadoop |
| Consul | Raft | KV + service catalog + DNS | long-polling | service discovery, DNS, ACL, mesh | Go binary + agent fleet | HashiCorp stack |
| Chubby | Paxos (Multi-Paxos) | hierarchical files | event channels | name service, ACL | internal Google only | GFS, Bigtable, Borg |
| TiKV PD | Raft (sharded) | KV with regions | streamed | placement, scheduling | embedded in TiKV | TiDB, TiKV |
Raft (etcd, Consul, TiKV PD, CockroachDB)
The modern default. Ongaro and Ousterhout's 2014 protocol, designed for understandability. Pieces are well-prescribed: strong leader, randomized election timeouts, log-completeness vote check, two-phase joint-consensus membership changes. Production implementations across etcd, Consul, CockroachDB, TiKV, and Apache Kudu share the same mental model.
Zab (ZooKeeper Atomic Broadcast)
Predates Raft. Structurally similar — leader plus followers, monotonic epoch numbers (analogous to Raft's term). Distinctive: FIFO client order guarantee, explicit recovery phases (discovery → sync → broadcast). Isomorphic to Raft for the consensus part. Differences are surface-level (epoch vs term, znodes vs flat keys) and operational (JVM tuning vs single Go binary).
Paxos (Chubby, Spanner internals)
Lamport's original. Multi-Paxos skips the prepare phase for the common case of a stable leader. Production Paxos exists, but Paxos doesn't prescribe leader election, log replication structure, or membership change — every implementation has rolled its own and gotten subtly different things. Precisely the problem Raft was designed to fix.
Why etcd's "BBolt B+ tree + Raft" is the modern default
Coordination workloads share four characteristics: (1) small dataset, (2) read-heavy, (3) writes already gated by consensus fsync, (4) range scans over prefixes are common. These point at a B+ tree state machine rather than an LSM (Log-Structured Merge) tree. LSM's advantage — absorbing high write throughput by sequential writes — is wasted when Raft fsync is the ceiling. B+ tree gives predictable read latency, clean range scans, mmap-friendly I/O, and no compaction stalls.
BBolt (fork of Bolt, itself LMDB-inspired) implements a copy-on-write B+ tree. Combined with Raft for log replication and MVCC (Multi-Version Concurrency Control) for revision-based watches, this is the canonical modern stack. Every new coordination service since 2015 — etcd, CockroachDB's stores, TiKV's range storage — has converged on a variant of this design.
§4. Byte-level mechanics — Raft, BBolt, MVCC, watch
Raft mechanics, BBolt B+ tree page layout, MVCC revision keying, watch implementation, fsync ordering, and a byte-level walkthrough of one write.
4.1 Raft consensus log
Raft gives every voter the same totally ordered log of entries. Once a log entry is committed (acked by majority quorum), it is applied to the state machine on every node in the same order. The state machine is incidental — Raft provides the log; the state machine consumes it.
Each Raft entry:
struct LogEntry {
uint64 term; // election term when created
uint64 index; // monotonic, 1, 2, 3, ...
bytes command; // serialized state-machine command
}
Every voter holds three monotonically increasing counters:
commitIndex— highest index known to be committed (acked by majority).appliedIndex— highest index applied to the state machine.currentTerm— the election term the node believes it is in.
Invariant: appliedIndex <= commitIndex <= lastLogIndex.
Leader election
Followers start a randomized election timeout between 150 and 300 ms (randomization is critical — without it, all followers time out simultaneously and split the vote forever). When the timer fires:
- Transition to candidate; increment
currentTerm; vote for self. - Send
RequestVote(term, candidateId, lastLogIndex, lastLogTerm)to peers.
Each peer votes yes only if (a) it has not voted in this term AND (b) the candidate's log is at least as up-to-date as its own. "Up-to-date" means: candidate's lastLogTerm > yours, OR lastLogTerm == yours AND lastLogIndex >= yours. This is the log completeness check — preventing a node with a stale log from becoming leader and overwriting committed entries.
The candidate wins with floor(N/2) + 1 votes (including itself). Transitions to leader, immediately sends heartbeats (empty AppendEntries) to suppress peers' election timers. Split-vote case: both candidates retry at higher term with new randomized timeouts; one usually fires earlier and wins. Election completes in 1-2 cycles, ~300-600 ms.
Log replication
Per follower:
nextIndex[f] = leader's lastLogIndex + 1 (initial)
matchIndex[f] = 0 (initial)
loop:
prevLogIndex = nextIndex[f] - 1
prevLogTerm = log[prevLogIndex].term
entries = log[nextIndex[f] : nextIndex[f] + batch]
send AppendEntries(term, leaderId, prevLogIndex, prevLogTerm,
entries, leaderCommit)
if response.success:
matchIndex[f] = nextIndex[f] + len(entries) - 1
nextIndex[f] = matchIndex[f] + 1
else:
nextIndex[f] -= 1 // mismatch; step back
The follower, on receiving AppendEntries:
- If
term < currentTerm: reject. - If
term > currentTerm: update, step down. - Consistency check: if
log[prevLogIndex].term != prevLogTerm, reject. Leader steps back and retries until matching; follower then truncates and appends. - Append entries to local log;
fsyncbefore responding. - Update local
commitIndex = min(leaderCommit, lastLogIndex).
Commit rule (leader-side): commitIndex advances to the highest index N such that N was created in the current term AND a majority of matchIndex[] are >= N. The "current term" restriction is the famous Raft Figure 8 fix — committing entries from prior terms requires committing a current-term entry on top, otherwise a deposed leader can resurrect and overwrite.
Term and the "newer term wins" invariant
Term is a Lamport-like logical clock. Every RPC carries the sender's term. Any node receiving a higher term: updates currentTerm, becomes follower, resets vote. This single invariant — higher term wins, lower term shuts down — prevents two leaders from coexisting. Even an isolated old leader steps down on the first message in a higher term, and since votes for a new term require seeing that term first, the old leader cannot commit anything new.
4.2 BBolt — copy-on-write B+ tree
After a log entry is committed, etcd applies it to BBolt: a single-file embedded KV store using a copy-on-write B+ tree.
Page layout
BBolt file (e.g., default.etcd/member/snap/db)
─────────────────────────────────────────────────
Page 0: meta page A (root ptr, txn id, freelist, checksum)
Page 1: meta page B (alternate copy — meta pages flip)
Page 2: freelist
Page 3+: branch pages and leaf pages
┌────────────────────────────────┐
│ Branch page (interior node) │
│ ┌──┬──┬──┬──┬──┬──┬──┐ │
│ │k1│p1│k2│p2│k3│p3│..│ k+ptr │
│ └──┴──┴──┴──┴──┴──┴──┘ │
└─────┬───────┬──────────────────┘
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Leaf │ │ Leaf │
│ k:v k:v │ │ k:v k:v │
└──────────┘ └──────────┘
4 KB pages. Leaf holds ~30-40 KV pairs. Branching factor b ≈ 30-50. At 8 GB / 4 KB pages = 2M pages, tree is 4-5 levels deep. With the working set in the OS page cache (the whole DB usually fits in RAM), a lookup is essentially a memory access. BBolt mmaps the file; reads are page-fault driven.
Copy-on-write
When a write transaction mutates a page, BBolt allocates a new page from the freelist and writes the new version there. The parent's pointer is updated to the new child — but the parent must also be COW'd, and so on up to the root. At commit:
- New data pages are written and
fsync'd. - New root is written.
- Meta page is updated to point at the new root and
fsync'd. - Old pages go to the freelist (after no live reader is still referencing them).
The double-fsync — once for data pages, once for the meta page — is the durability point. After the second fsync, a crash recovers to the new state. Crash between fsyncs: next startup reads the old meta page (still pointing at the old root); partially-written new pages are garbage.
Why a B+ tree and not LSM
For LSM (RocksDB, LevelDB) the write path is sequential append to a memtable + commit log, then async compaction. This absorbs 50-100k writes/sec. But:
- Raft fsync already caps writes at ~10k/sec — LSM's write capacity is wasted.
- Read latency on LSM is unpredictable (probe multiple sorted runs + bloom filters).
- Compaction stalls are an operational problem.
- Range scans cross multiple sorted runs and require a merging iterator.
B+ tree gives O(log_b N) point lookups with predictable cost, range scans along leaves with no merging, and no background compaction. Trade-off: lower max write throughput. Acceptable because Raft already capped it.
4.3 MVCC layer
etcd exposes a Multi-Version Concurrency Control keyspace: every key has a history of revisions. Each put/delete is assigned a globally monotonic revision number, which is the Raft log index for that operation:
revision 100: PUT /a = "x" (mod=100, ver=1, create=100)
revision 101: PUT /b = "y" (mod=101, ver=1, create=101)
revision 102: PUT /a = "x'" (mod=102, ver=2, create=100)
revision 103: DELETE /a (mod=103, tombstone)
revision 104: PUT /a = "x''" (mod=104, ver=1, create=104)
BBolt stores:
- Key bucket:
(/a, 100) -> "x",(/a, 102) -> "x'",(/a, 103) -> tombstone,(/a, 104) -> "x''". - Index bucket (in-memory btree): logical key
/a→ list of revisions with metadata.
A read at revision r for /a finds the highest revision <= r. A "watch from revision 100" replays every change at revision > 100. Compaction (etcdctl compact) discards revisions below a threshold; auto-compaction every 5 minutes is standard.
4.4 Watch implementation
client → etcd: WatchCreate(key="/pods/", prefix=true, start_rev=12345)
Server: (1) registers the watch in an in-memory watcher table; (2) if start_rev is past (after compaction), replays from MVCC — the catch-up phase; (3) once caught up, the watcher is synced and receives every subsequent matching commit in order.
Fanout: 10k watchers × 10k writes/sec all touching a watched prefix = 100M events/sec — far past what etcd can do. This is why systems put aggregators in front of etcd (the Kubernetes apiserver). Contract: events delivered in order; clients reconnect with start_rev = last_seen + 1. If disconnected so long that compaction discarded the starting point, etcd returns ErrCompacted and the client does a full resync.
4.5 Walk through one write end-to-end
Client issues Put("/locks/job-runner", "node-7", lease=42).
Step 1: Client connects to one of three nodes. If on a follower, the follower returns the leader address. Call the leader A.
Step 2: A serializes the request as a Raft proposal and appends to its own log as LogEntry { term=15, index=98234, command=<bytes> }.
Step 3 — leader WAL fsync: A appends the new entry to its wal/ segment file, then fsyncs. This is the leader's durability point. Latency: ~1 ms on NVMe.
Step 4 — leader sends AppendEntries (in parallel with step 3):
AppendEntries {
term: 15, prevLogIndex: 98233, prevLogTerm: 15,
entries: [new entry at 98234], leaderCommit: 98233
}
Step 5 — followers WAL fsync and respond: B and C verify prevLogIndex/prevLogTerm, append, fsync, respond success=true with matchIndex=98234.
In a 3-node cluster, A needs 1 follower ack (A + 1 follower = majority of 3). As soon as either B or C responds, the entry is committed.
Step 6: A sees matchIndex[B]=98234, advances commitIndex to 98234.
Step 7 — apply to BBolt: A's apply loop opens a write transaction, inserts MVCC entries (key_bucket[("/locks/job-runner", 98234)] = "node-7"), updates the in-memory index, associates the key with lease 42. Commits the BBolt transaction → BBolt's own fsync sequence (data pages → meta page).
Step 8: A responds to the client with revision 98234.
Step 9: Followers eventually apply when they see leaderCommit=98234 on the next heartbeat.
Crash recovery at each step:
- Before step 3 fsync: nothing. Client times out. No node has the entry.
- After step 3, before step 4: A has the entry, B and C don't. New election may propagate or discard depending on whose log overtakes.
- After step 5: entry durable on 2 of 3 → committed. Next leader has it.
- Between steps 7 and 8: client doesn't know. Must retry; revision-based CAS handles idempotency.
§5. Distributed lock recipes — the actual API patterns
Distributed locking is the most common, and most error-prone, use of coordination services. This section walks the canonical lock recipe in etcd, ZooKeeper, and Consul, then ends with the Kleppmann critique of Redlock that anchors why these recipes look the way they do.
5.1 The general shape of a correct distributed lock
Any correct distributed lock has five elements:
- Acquisition with a timeout. Block at most
Tseconds; don't let a slow lock kill the caller forever. - A lease (TTL). The lock auto-releases if the holder dies — the coordination service decides liveness, not the client.
- A fencing token returned to the holder — the load-bearing safety property (see §5.5 for the Kleppmann argument).
- Watch on the predecessor, not on the whole prefix. Waiters subscribe to the change that releases the lock, serialized into a chain instead of a thundering herd.
- Idempotent release. Releasing twice or releasing after lease expiry must be safe.
5.2 etcd recipe — lease + key with revision, fencing token from revision number
etcd v3's locking is built on three primitives: Lease, Txn (compare-and-swap by revision), and Watch. The official concurrency.NewMutex package implements the pattern below:
// (sketch of what etcd Mutex does)
leaseID := client.Grant(ctx, ttlSeconds).ID
go keepAlive(client, leaseID) // refresh lease every TTL/3
// Each contender writes its own key under a shared prefix.
candidate := fmt.Sprintf("%s/%x", prefix, leaseID)
txn := client.Txn(ctx).
If(etcd.Compare(etcd.CreateRevision(candidate), "=", 0)).
Then(etcd.OpPut(candidate, "", etcd.WithLease(leaseID))).
Commit()
myRev := txn.Header.Revision // <— FENCING TOKEN
// List all candidate keys sorted by CreateRevision; lowest holds the lock.
// If we're not lowest, Watch on the predecessor (the next key below us).
// When predecessor is deleted, re-check ordering.
Why this works:
- Lease auto-release on client death. If the keepalive stops — crash, partition, TCP failure — the lease expires server-side and every key written
WithLease(leaseID)is deleted. Liveness is the coordination service's view, not the client's belief, which is what makes the lock safe across pauses. - Fencing token from
CreateRevision. Revision is the Raft log index. Never goes backward, even across leader failover. The contender'sCreateRevisionis a monotonic counter unique to this acquisition; pass it to every downstream resource, which rejects operations with a stale token. - Watch the predecessor, not the prefix. Watching the prefix means every release wakes every waiter — 1,000 wakes for one release. Watching the predecessor wakes only the next-in-line.
- Idempotent release via
Revoke(leaseID); revoking an expired lease is a no-op.
5.3 ZooKeeper recipe — ephemeral sequential nodes for queue locks
# Create an ephemeral sequential znode under the lock path.
# Ephemeral: deleted when the session ends.
# Sequential: ZK appends a monotonically-increasing 10-digit counter.
my_path = zk.create(f"{lock_path}/lock-", ephemeral=True, sequence=True)
my_seq = int(my_path.split("-")[-1]) # <— FENCING TOKEN
while True:
children = sorted(zk.get_children(lock_path))
if my_path is lowest:
return my_seq, lambda: zk.delete(my_path)
# Watch predecessor (one slot below); when deleted, re-check.
zk.exists(predecessor_path, watch=on_event)
event.wait()
Properties:
- Ephemeral nodes give lease semantics via the session. A ZK session is a TCP connection plus a session ID; if the client misses heartbeats within the session timeout (6-30s), the server expires the session and deletes every ephemeral node owned by it. ZK's analogue of an etcd lease.
- Sequential numbering = fencing token. The 10-digit counter is server-issued, monotonic, tied to the parent znode's
cversion. Doesn't regress across ensemble failover because it's part of consensus-committed state. - One-shot watches. ZK watches fire once; you must re-register. Easy to miss an event between fire and re-register — etcd's revision-based watches replay from a starting revision and are easier to make correct.
5.4 Consul recipe — session-based locks
Consul builds locks on sessions (TTL-driven liveness) plus KV with acquire semantics:
session_id = consul.session.create(ttl="30s", behavior="release", lock_delay="15s")
threading.Thread(target=renew_loop, args=(consul, session_id)).start()
# kv.put with acquire= succeeds iff no other session holds it.
acquired = consul.kv.put(key, "owner", acquire=session_id)
# If not, long-poll on the key for change to Session field, then retry.
token = consul.kv.get(key)["ModifyIndex"] # <— FENCING TOKEN
Distinctive features:
lock_delayis a safety quarantine: when a session holding a lock dies, the key is held in "no acquire" state for 15s before another session can take it. Protects against the "session expired but the holder doesn't know yet" race.ModifyIndexas fencing token. Like etcd revision and ZK cversion — globally monotonic.- Sessions can be tied to health checks, dying when a check fails — more sophisticated than pure TTL liveness.
5.5 The Kleppmann critique of Redlock
Martin Kleppmann's 2016 essay "How to do distributed locking" is the seminal critique of treating distributed locks as a one-line problem. Three failures, each fatal:
Failure 1 — process pauses break wall-clock leases. Java client acquires a 10-second lock at t=0. At t=2, a 15-second GC pause hits. The lock service expires at t=10; another client acquires at t=11; the original wakes at t=17 still believing it holds the lock. Two clients in the critical section simultaneously. No lock service can prevent this if the only safety property is TTL. The fix — fencing tokens — is what every recipe above is constructed around.
Failure 2 — Redlock's clock-drift assumption. Antirez's Redlock relies on bounded clock drift across N independent Redis nodes. Network Time Protocol (NTP) step adjustments, leap seconds, hypervisor wall-clock jumps after Virtual Machine (VM) migration, VM pauses — any of these violate the bound. When violated, Redlock grants two leases without either node noticing.
Failure 3 — Redlock fencing tokens cannot be made monotonic across failures. Each Redis node has its own counter. Failover loses the counter. A real fencing token is monotonic across all failures forever — a property a single Raft group provides for free (revision = log index), but a federation of independent Redis nodes cannot provide without consensus.
The phrasing: "Redlock is a fast lock; it is not a correct lock under realistic failure conditions. The cost of correctness is exactly one Raft group."
5.6 A correct distributed lock acquisition, end-to-end
The contract a caller of an etcd-backed lock should expect:
def with_lock(resource_key, work_fn, max_wait=10, lease_ttl=30):
deadline = time.now() + max_wait
while time.now() < deadline:
try:
token, release = acquire_lock(etcd, resource_key, ttl=lease_ttl)
break
except LockBusy:
sleep(jittered_backoff())
else:
raise LockTimeout()
try:
work_fn(fencing_token=token) # downstream: token >= max_token_seen
finally:
release() # idempotent
Every safety property from §5.1: timeout, lease-driven liveness, monotonic fencing token, predecessor watch, idempotent release. The mistake in 90% of "we built our own distributed lock" outages is that one of these five was missing.
§6. Service discovery patterns
Service discovery splits into three architectural styles with very different scaling characteristics. "Use etcd for service discovery" is incomplete until you say which of the three.
6.1 Pattern A — client-side discovery
Every client of a service queries (or watches) the coordination service directly. Netflix's original Eureka model, early Kafka consumers reading partition ownership from ZooKeeper, and "etcd-aware" clients in Go ecosystems are examples.
Coordination-service load: every client opens at least one watch stream; the stream count equals the client fleet count. At 100k microservice instances, that is 100k watch streams — an order of magnitude past etcd's ~10k stream ceiling.
Fits when: small fleets (< 1,000 clients), latency-critical paths where one extra hop is unacceptable, or custom client-side routing (consistent hashing, sticky shard ownership) that a generic LB can't encode.
Breaks at scale: as the fleet grows, the coordination service is the bottleneck — front it with an aggregator (§6.2) or move to gossip (§6.3).
6.2 Pattern B — server-side discovery
The coordination service holds the registry; a load balancer (LB) or service mesh proxy reads it on the clients' behalf; clients talk only to the LB. AWS Application Load Balancer target groups, Kubernetes Services backed by EndpointSlices (apiserver reads from etcd), Envoy sidecars consuming EDS (Endpoint Discovery Service) from a control plane — all variants.
etcd → watched by → apiserver / Envoy control plane → DNS / Virtual IP (VIP) / xDS → 100k clients
Coordination-service load: O(LB fleet) watches, not O(client fleet) — a few hundred apiservers means a few hundred watches. The aggregator absorbs the fanout.
Fits when: large fleets (10k+ clients) — the default at scale; polyglot environments where embedding a coordination-aware client library in every language is impractical; centralized policy (rate limiting, mutual TLS termination, retries, circuit breaking) belongs at the LB anyway.
6.3 Pattern C — gossip-based
Every node spreads its view to a few peers periodically; over O(log N) rounds the cluster converges. Consul's serf layer, Cassandra's gossip, SWIM (Scalable Weakly-consistent Infection-style process group Membership, the protocol Consul implements). No central registry — every node reconstructs it locally from gossip messages.
Coordination-service load: zero — there isn't one for liveness. Consul splits the responsibility: Raft-backed KV/catalog for strongly-consistent state, SWIM gossip for liveness, so liveness pings don't flow through Raft.
Fits when: liveness detection at scale (SWIM scales to tens of thousands of nodes — traffic grows linearly, not quadratically); eventual consistency is acceptable; decentralized environments (geo-distributed, edge fleets) with no good place to put a single coordination cluster.
Wrong for: anything needing linearizability (leader election, lock fencing, schema changes) — gossip is eventually consistent and two nodes can briefly believe they are both the leader. Cassandra and Consul both hybridize — gossip for high-frequency, eventually-OK questions; embedded Paxos / Raft for the few operations that need strong consistency.
6.4 Choosing
| Fleet size | Freshness need | Pattern |
|---|---|---|
| < 1,000 | Strong | Client-side direct |
| 1,000-100,000 | Strong | Server-side via aggregator (default) |
| > 100,000 | Mixed | Hybrid: aggregator + gossip for liveness |
| Any | Eventual OK | Gossip |
The framing: "client-side puts every client on the coordination service's connection table; server-side puts only the LB fleet there; gossip puts no one there. Pick by the size of the connection table you can afford."
§7. Watch mechanics and scale
§4.4 introduced watches at the API level. This section goes deeper into how they're implemented in etcd v3 and names the thundering-herd problem that drives every production architecture.
7.1 etcd v3 watch streams — gRPC multiplexing
etcd v3 replaced v2's HTTP long-polling with gRPC bidirectional streams over HTTP/2. One TCP connection can carry many watches because gRPC multiplexes them as separate HTTP/2 streams within a single TCP socket. A client can have 1,000 logical watches on one TCP socket. The server still tracks each watch as an independent subscriber in memory — the TCP is shared, the bookkeeping is not.
The precise statement is: one HTTP/2 stream per watch, multiplexed over a single TCP per client connection. Until file descriptor limits or kernel epoll cost dominates, this is fine.
7.2 Watch fragmentation
A single event can be large (up to 1.5 MB default); a WatchResponse is capped at gRPC message size (4 MB default). When a watched prefix sees a burst — 1000 Pods updated in the same Raft commit — etcd fragments the response across multiple WatchResponse messages marked Fragment=true until the last.
HTTP/2 stream-level flow control applies: a slow client backs up its stream, etcd's per-watcher buffer fills, and after a threshold etcd disconnects the slow watcher with WATCH CANCEL. The client must reconnect with a fresh start_rev. This protects the cluster at the cost of partial views for slow clients.
7.3 Watch revision filtering — the bookmark mechanism
Every watch carries a start_rev (default: current + 1, "future events only"). The server only sends events with revision >= start_rev matching the key/prefix.
If start_rev is behind the compaction watermark (e.g., start_rev=10000 but everything before 50000 has been compacted), the server returns ErrCompacted and the client must do a full resync. This bookmark contract is what makes etcd's watch model easier to reason about than ZooKeeper's one-shot watches: as long as you reconnect before compaction, you get every event in between. ZK fires once and you must re-register; a delete between fire and re-register is missed.
7.4 The thundering-herd problem — 10k watchers on one key
Scenario: 10,000 kubelets each watch /leader-election/scheduler. The leader changes. All 10,000 watchers wake up in the same Raft commit:
- 1 Raft commit (the lock release + reacquisition).
- 10,000 watch deliveries — at 1 KB each, 10 MB egress on a single commit boundary.
- 10,000 watchers respond with secondary operations — 10,000 additional
Gets plus 5,000 additionalPuts.
One logical event produces a flash crowd etcd cannot handle. p99 spikes, watchers time out, reconnects re-issue lists, the storm self-amplifies for seconds. Mitigations: don't watch the same key from 10,000 places (aggregate); coalesce events at the aggregator; backoff with jitter on secondary reactions.
7.5 Why Kubernetes apiserver inserts reflector + indexer + informer
The apiserver implements a three-layer cache between etcd watches and pod controllers:
etcd ── ONE watch per resource type ──► apiserver
├── Reflector (reads etcd watch, normalizes)
├── Indexer (in-memory cache by name, labels)
└── Informer (exposes List + Watch to controllers,
served from cache not etcd)
│
▼ thousands of controller watches
controllers, kubelets (10k+)
The architectural rule: etcd sees O(resource_types) watches — about 50. The apiserver sees O(clients) — 10,000+. etcd is shielded from fanout entirely; the apiserver scales horizontally behind a load balancer.
The lesson generalizes: whenever you have more than a few hundred watchers, you need an aggregator. Consul does it via per-host agents; apiserver via reflector/informer; service-mesh control planes via xDS aggregation. The coordination service is not a fanout engine.
§8. etcd MVCC compaction — why you must run it or your cluster dies
§4.3 introduced MVCC. This section names the operational hazard: MVCC history grows unboundedly, BBolt expands, and forgotten compaction is the most common way to kill an etcd cluster.
8.1 Every write bumps revision; history accumulates
Every Put, Delete, or Txn is assigned a monotonic revision (= Raft log index). The key/value bucket stores every version of every key:
("/config/feature-X", 1001) → "off"
("/config/feature-X", 1547) → "on"
("/config/feature-X", 2104) → "on"
("/pods/payment-7", 1451) → <pod manifest>
At 1,000 writes/sec, that is 86 million revisions/day. At ~1 KB/object, ~85 GB/day if every revision is kept — the 8 GB ceiling is reached in under three hours.
8.2 Compaction drops history; defrag reclaims space
etcdctl compact <revision> discards MVCC entries below the watermark except the latest per still-live key. Two consequences:
- Watches with
start_revbelow the watermark fail withErrCompacted. Client must do a full resync. The freshness vs storage trade. - BBolt space is reclaimed only after
etcdctl defrag. Compaction marks B+ tree pages free but doesn't shrink the file. Compact and defrag are two separate operations; running compact without defrag means disk space is not reclaimed even though the in-memory MVCC index has been trimmed.
8.3 Forgetting to compact = cluster dies
Deploy etcd with no auto-compaction. Writes flow; BBolt grows. After a week the file hits 8 GB and etcd refuses writes:
ERROR: etcdserver: mvcc: database space exceeded
Cluster is in alarm:NOSPACE. Recovery: compact aggressively, defrag each node one at a time (defrag blocks the node; parallelizing loses quorum), disarm the alarm. One of the most common etcd outages; preventable by having auto-compaction on.
8.4 Kubernetes default — compact every 5 minutes
Kubernetes ships --auto-compaction-retention=5m. Every 5 minutes etcd compacts history older than 5 minutes — BBolt stays proportional to live state, not write rate. The 5-minute window is the maximum a disconnected client can fall behind before losing replay; for Kubernetes controllers running on local network, 5 minutes of disconnection is rare.
Auto-defrag is not on by default — run manually or via an operator. The lesson from many outages: ship both auto-compact and a defrag schedule.
§9. Snapshots and backup
The coordination service is the single source of truth for the control plane; lose it without backup and you lose every Kubernetes object, every Kafka topic config, every service registration. Working snapshot/restore drills are non-negotiable.
9.1 etcdctl snapshot save — point-in-time BBolt copy
etcdctl snapshot save /backup/etcd-snapshot-$(date +%s).db
Mechanism: etcd opens a read transaction on BBolt (an MVCC snapshot), copies the BBolt file, closes the transaction. Because BBolt is copy-on-write, the snapshot is a consistent point-in-time view even with concurrent writes — writes go to new pages, the snapshot reads the old root.
Output: v3 backup format — a BBolt file plus etcd metadata footer (cluster ID, latest revision, SHA256). Size equals current BBolt file (100 MB to 8 GB). Standard frequency: every 15-30 minutes; this sets the Recovery Point Objective.
9.2 etcdctl snapshot restore — bootstraps a new cluster
Restore does not apply to an existing cluster; it bootstraps a new cluster from the snapshot. A half-corrupted cluster cannot ingest a snapshot safely, so the procedure is "burn the cluster, start a new one with the snapshot as initial state":
etcdctl snapshot restore /backup/snapshot.db \
--name=etcd-0 \
--initial-cluster=etcd-0=...,etcd-1=...,etcd-2=... \
--data-dir=/var/lib/etcd-new
# Then start each etcd with --initial-cluster-state=new.
The restored cluster has a new cluster ID; clients pinning the old ID for safety must be reconfigured.
9.3 Cross-region snapshot replication
Snapshots are file objects — replicate like any blob. Standard Kubernetes control-plane pattern:
- Local snapshot every 15 minutes, kept 24 hours.
- Push to S3 with cross-region replication, kept 30 days.
- Weekly push to a separate cloud provider or off-site location.
9.4 Recovery Time Objective (RTO) and Recovery Point Objective (RPO) targets
For etcd backing Kubernetes:
- RPO: 15 minutes (snapshot interval — max data loss tolerated).
- RTO: 15 minutes (restore within 15 minutes of disaster).
Achievable because the snapshot file is small (< 8 GB), restore is local IO + bootstrap (2-5 minutes), and the apiserver/controllers/kubelets reconverge quickly. The cost: the whole cluster is offline during the restore window — RTO budgets that look generous for a database are tight for a coordination service because every dependent component is unavailable until coordination is.
The discipline: quarterly restore drills. The first time you restore in production, find out before it's an emergency that the snapshot is corrupt, IAM permissions to S3 are wrong, or etcdctl versions don't match.
§10. Authentication and authorization
Coordination services hold the most sensitive metadata — service credentials, schema mappings, leadership leases. AuthN/AuthZ here is not optional even in private DCs.
10.1 etcd v3 RBAC (Role-Based Access Control)
Three-tier model: users, roles, permissions, plus optional mutual TLS (mTLS) identity binding:
etcdctl role add pod-reader
etcdctl role grant-permission pod-reader read /pods/ --prefix
etcdctl user add kubelet
etcdctl user grant-role kubelet pod-reader
- User = identity (password or TLS client cert auth).
- Role = collection of permissions, each
(read|write|readwrite, key|range|prefix). - Permissions at prefix level — not arbitrary patterns.
TLS client cert auth is the production pattern: each component (apiserver, kubelet, etc.) has a cert; the Common Name (CN) maps to an etcd user. No passwords on disk.
Two distinct TLS cert sets in production etcd:
- Client-server TLS (
--client-cert-auth,--cert-file): API traffic. - Peer-peer TLS (
--peer-client-cert-auth,--peer-cert-file): Raft traffic.
Different Certificate Authorities (CAs) are recommended to limit blast radius if one CA is compromised.
10.2 ZooKeeper ACLs (Access Control Lists)
ZK assigns ACLs per znode. Each entry is <scheme>:<id>:<perms>:
- Scheme:
world,auth,digest,ip,x509,sasl. - ID: scheme-dependent (for
digest:username:base64(SHA1(username:password)); forx509: cert DN; forip: CIDR). - Perms: subset of
READ,WRITE,CREATE,DELETE,ADMIN.
Notable: no inheritance — every znode has its own ACL list (cumbersome at scale; usually set programmatically). ADMIN permission is separable from WRITE — needed to change a znode's ACL.
10.3 Consul Access Control List tokens
Token-based. Every API call carries a token (X-Consul-Token header). Tokens are bound to policies that grant permissions on specific resources (KV paths, service catalog entries, sessions).
Features: token chaining (child token can have only a subset of parent's permissions), token TTL for short-lived workloads, auth methods integrating with Kubernetes ServiceAccount tokens, OpenID Connect (OIDC), JSON Web Tokens (JWTs) — a workload pod authenticates with its ServiceAccount and receives a Consul token bound to a policy. The modern preferred pattern.
10.4 The "kubectl as cluster-admin" pattern
A user holds a TLS client cert whose CN is kubernetes-admin. The apiserver authenticates the cert, extracts the CN, maps it to a Kubernetes user. Kubernetes RBAC checks the user's ClusterRoleBindings. The apiserver then talks to etcd using its own etcd cert with apiserver-level permissions.
The chain: user cert → apiserver authN → apiserver authZ → apiserver-to-etcd cert → etcd authN → etcd authZ. Two authentication boundaries, each with its own RBAC. The boundary lets Kubernetes implement object-level authorization (user X can read Pods in namespace Y) that etcd's flat key model cannot natively encode.
The point: coordination services give you crude prefix-level access control; finer-grained authorization belongs in the aggregator layer.
§11. Multi-DC coordination
Stretching a coordination service across data centers (DCs) is one of the most common design mistakes.
11.1 5-node etcd across 3 DCs in a 2-2-1 distribution
The standard topology: 5 voters, distributed 2-2-1 across three DCs — two in DC-A, two in DC-B, one in DC-C. Quorum is 3 of 5. Failure analysis:
- Lose DC-A entirely (A1+A2): 3 remaining (B1, B2, C1) — quorum holds.
- Lose DC-B entirely (B1+B2): 3 remaining (A1, A2, C1) — quorum holds.
- Lose DC-C (C1): 4 remaining; quorum holds easily.
- Lose DC-A and DC-C: only B1+B2 — 2 of 5, no quorum; cluster goes read-only.
- Lose DC-B and DC-C: only A1+A2 — 2 of 5, no quorum.
Survives any single DC loss; tolerates no two-DC simultaneous loss. If the leader was in the failed DC, election picks a new leader from the remaining 3; the 150-300 ms election timeout means sub-second unavailability after failure detection.
11.2 The latency tax — cross-DC round-trip time (RTT) adds to every commit
Every Raft commit requires fsync on a majority. With 2-2-1 topology and 20 ms inter-DC RTT:
- Leader in DC-A: local fsync ~1 ms.
- AppendEntries to DC-A peer (A2): ~1 ms RTT + ~1 ms fsync = 2 ms.
- AppendEntries to DC-B peers (B1, B2): 20 ms RTT + ~1 ms fsync = 21 ms.
Majority = 3 acks: leader + A2 + first DC-B peer at 21 ms. Commit latency: 21 ms. Without cross-DC: 2-3 ms. Every write pays the 20 ms tax regardless of where the client lives — no local-write optimization, because Raft requires majority and majority crosses DCs.
11.3 When NOT to run etcd cross-DC — RTT > 50 ms
Above ~50 ms inter-DC RTT, cross-DC etcd becomes unworkable:
- 50 ms commit latency means throughput collapses (would need ~500 in-flight writes, beyond reasonable Raft pipelining).
- Heartbeat tuning becomes fragile (election timeout must be 500+ ms, so failover takes a full second).
- Cross-region network blips (transient congestion, Border Gateway Protocol (BGP) reconvergence) trigger spurious elections.
The threshold is well-established: don't run a single etcd cluster across regions if RTT > 50 ms. Kubernetes upstream recommends keeping etcd intra-region.
11.4 What to do instead
- One coordination service per region. Kubernetes federations run one etcd per cluster.
- Replicate cross-region state out-of-band via Kafka topics, change-data-capture (CDC), or gossip.
- Accept eventual consistency cross-region. A new pod in DC-A appears in DC-B's registry after the replication lag.
Spanner and CockroachDB cheat by running many Raft groups, each placed in a region; cross-region transactions touch multiple groups via 2-phase commit. They do not stretch a single Raft group across regions — a distinction worth knowing.
§12. Chubby vs etcd vs ZooKeeper
The three canonical coordination services share one lineage and diverge along surface-level operational lines.
12.1 Chubby (Burrows, 2006) — the ancestor
Mike Burrows' OSDI 2006 paper "The Chubby lock service for loosely-coupled distributed systems" defined the design space. Chubby is Google-internal; never open-sourced.
Design:
- 5-node Paxos cell per data center.
- Hierarchical filesystem-like API (
/ls/cell/path/to/object). - Coarse-grained locking — lock hold times in hours, not microseconds.
- Event channels for change notifications — the ancestor of watches.
- KeepAlive RPC — long-poll that returns when the server has news.
Burrows' famous observation: Chubby ended up being used more for name service than for locks — every Google service looked up "where is the master of GFS cluster X" via Chubby. "The name service is down" became its own Google-internal failure category.
12.2 etcd and ZooKeeper as derivatives
- ZooKeeper (Yahoo, 2008): open-source Chubby. Hierarchical znodes, session-based liveness, similar watches. Zab consensus instead of Multi-Paxos; JVM.
- etcd (CoreOS, 2013): flat KV (simpler than znodes), Raft consensus, Go binary. Adopted by Kubernetes in 2014; became the de facto standard.
Chubby (2006, Google) ──► ZooKeeper (2008, Yahoo) ──► etcd (2013, CoreOS)
hierarchical hierarchical znodes flat KV with MVCC
Multi-Paxos Zab consensus Raft consensus
KeepAlive sessions leases
coarse locking one-shot watches replayable watches
12.3 KeepAlive (Chubby) vs Lease (etcd) vs Session (ZooKeeper)
All three solve the same problem — the server's view of client liveness — three ways:
Chubby KeepAlive: client long-polls; server returns when there is news or the lease is about to expire. If no KeepAlive within the lease window, leases are released.
etcd Lease: separate object with a TTL (Grant(ctx, ttl)); keys are attached via WithLease(leaseID); renewal via gRPC stream of KeepAlive calls. Lease expiry deletes all attached keys.
ZooKeeper Session: TCP connection + session ID with negotiated timeout (6-30s); pings every timeout/3. Session expiry deletes all ephemeral nodes owned by the session.
Differences:
- etcd lease objects are separate from keys. One lease can be attached to many keys — renewing keeps a whole set alive together. Cleaner than ZK's "every ephemeral node is tied to the session."
- ZK sessions are tied to the TCP connection. Disconnect/reconnect is a session-recovery operation; session timeout loses all watches. etcd's lease can be renewed over any new connection — more failure-tolerant.
- Chubby piggybacks liveness on event delivery. Single mechanism for both — clever but harder to reason about than splitting them.
The modern preference is etcd's model — Kafka KRaft's choice to drop ZooKeeper reflects partly this design preference (KRaft is closer to etcd than to ZK).
12.4 Why Google built Chubby — "coarse-grained locking" as DNS
Chubby was originally built for GFS master election, then BigTable tablet ownership, then Borg. The dominant use stopped being locking and became name service. The framing: "coordination services are equal parts lock service and name service; the API surface looks like the former and the production use is mostly the latter."
§13. Capacity envelope, illustrated across deployments
Back-of-envelope ceilings for a 3-node etcd-class Raft cluster on modern hardware (NVMe SSD, 10 Gbps network, intra-DC).
Write throughput: ~10k/sec sustained, ~30k peak
Per write critical path: leader fsync 0.5-2 ms (NVMe), network RTT 0.2 ms intra-rack to 10 ms intra-region, follower fsync (parallel) 0.5-2 ms, apply fsync ~0.3 ms. End-to-end ~2 ms p50, ~8-10 ms p99. Past ~10k/sec sustained, fsync queues blow up and Raft heartbeats miss deadlines.
Compare a sharded MySQL: a single primary does 20-40k writes/sec, a 64-shard cluster does 1.2M+. The coordination service is 100x slower on writes than a sharded OLTP database — by deliberate design, every write pays for a quorum fsync.
Data size ceiling: ~8 GB
etcd documents 2 GB default, 8 GB hard ceiling. Past that: BBolt becomes hard to compact/snapshot; a new follower joining at 8 GB / 1 Gbps takes 64 s of streaming during which it's not contributing to quorum; the in-memory MVCC index outgrows comfortable RAM.
Watch fanout: ~10k streams
A single etcd cluster sustains ~10k active watch streams over gRPC, several hundred thousand individual key watches (prefix watches count as one stream). Event throughput tracks write throughput — every commit fans out to N matching watchers.
Illustration across deployments
Kubernetes etcd, small (~100 nodes): 100 MB state, 100-500 writes/sec, ~50 watchers. 3-node etcd, ~1 ms write latency.
Kubernetes etcd, large (5,000+ nodes; GKE, OpenAI training fleets): 2-6 GB state, 5-10k writes/sec peak, hundreds of long-lived watches via the apiserver. 3- or 5-node, ~5 ms write p99. Aggressive compaction mandatory; CRDs and events sometimes sharded out.
Kubernetes etcd, very large (Alibaba, ByteDance: 100k+ nodes): approaching 8 GB. Multiple apiserver replicas absorbing watch fanout. Dedicated SRE rotations on etcd health.
Kafka KRaft (3.3+, 2022): pre-KRaft, LinkedIn-scale Kafka with 200k+ partitions hit the ZooKeeper wall — hundreds of thousands of znodes, watch fanout from every broker, controller failover taking minutes because the new controller had to scan the ZK tree. KRaft moved metadata into a Kafka topic (__cluster_metadata) backed by embedded Raft. Controller failover dropped to single-digit seconds; Kafka now scales past 1M+ partitions per cluster.
HashiCorp Consul, service mesh: tens of thousands of services, ~5k writes/sec, watches from every proxy sidecar (potentially 50k+ proxies). Bundled SWIM gossip handles liveness without putting heartbeat traffic on Raft.
ZooKeeper for legacy Hadoop / HBase: a few MB of state, low write rate, hundreds of watchers. Legacy because the JVM operational footprint is heavier than etcd's.
Google Chubby cells (5-node Paxos): lock hold times in hours — coarse-grained locks, not microsecond mutex replacements. Holds GFS master leases, Bigtable tablet ownership, Borg master locks.
§14. Architecture in context
The canonical pattern: a small Raft cluster (3 or 5 nodes), with optional aggregator layers between clients and the service to absorb watch fanout. Sharding, partitioning, and replication of user data happen at the data plane below; the coordination service itself is replicated by consensus only.
┌────────────────────────────────────────────┐
│ Coordination Service Cluster │
│ 3 or 5 voters (odd number for quorum) │
│ │
│ ┌────────┐ │
│ │ NODE A │ ← elected LEADER │
│ │ WAL │ ── AppendEntries ──┐ │
│ │ BBolt │ │ │
│ └────┬───┘ │ │
│ │ heartbeat (50 ms) ▼ │
│ ┌────▼───┐ ┌────────┐ │
│ │ NODE B │ ←────────────────│ NODE C │ │
│ │ WAL │ │ WAL │ │
│ │ BBolt │ │ BBolt │ │
│ └────────┘ └────────┘ │
└────────────────┬───────────────────────────┘
│ gRPC (HTTP/2 multiplexed)
┌───────────▼─────────────┐
│ Aggregator layer │ (kube-apiserver,
│ - multiplexes watches │ Consul agent fleet,
│ - caches reads │ optional but
│ - rate-limits writes │ standard at scale)
└───────────┬─────────────┘
│
┌────────────┼────────────┐
│ │ │
┌───────┐ ┌───────┐ ┌───────┐
│ watch │ │ lock+ │ │ config│
│ /pods │ │ fence │ │ read │
└───────┘ └───────┘ └───────┘
Annotations:
- Sharding key for writes: there isn't one. All writes go through the single leader. Total order is required; horizontal write scaling is explicitly given up.
- Sharding key for reads: clients may read from any voter for serializable reads; reads from the leader (with read-index) are linearizable. etcd defaults to linearizable.
- Replication: synchronous to majority quorum. 3 nodes → 2 acks. 5 nodes → 3 acks. Tolerates
(N-1)/2failures. - Aggregator layer: standard in production. Without it, every client connects directly and exhausts watch capacity. The Kubernetes apiserver is the canonical example; Consul's agent fleet plays a similar role.
Cluster sizing
- 3 nodes — standard. Tolerates 1 failure. Lower commit latency (one follower ack).
- 5 nodes — for tolerating 2 failures (Chubby defaults to 5). Higher commit latency.
- 7+ nodes — almost never right. Latency grows with slowest-of-quorum; fault tolerance grows sub-linearly.
The asymmetry: adding nodes makes writes slower, not faster. Opposite of a normal database — the control-plane characteristic.
§15. Hard problems inherent to this technology
Problem 1 — Write throughput ceiling
Naïve fix. "Add more nodes to scale writes." Walk through: 3-node etcd at 10k writes/sec, p99 = 8 ms. Add 2 nodes → 5; quorum = 3; writes need 2 follower acks; the slowest of 2 followers gates each commit. Result: higher p99 latency (~12 ms), throughput drops or stays flat. Made it worse.
Illustration — Kubernetes apiserver during mass churn. Node drain on a 5,000-node cluster generates pod status updates faster than etcd's 10k/sec.
Real fix: (1) reduce write rate at the source (batch, coalesce, move ephemeral state out); (2) shard at the workload level (Kubernetes does this for events); (3) for genuinely huge metadata, switch topology to embedded sharded consensus (CockroachDB-style).
The point: a Raft group's write throughput is fundamentally bounded by quorum fsync. You don't scale a Raft group; you make more Raft groups.
Problem 2 — Watch storm / fanout amplification
Naïve fix. Send each event from etcd to every interested client.
Illustration — Kubernetes apiserver, 5,000-node cluster. 200k Pods; every kubelet, controller, and operator watches the Pod resource. 10k writes/sec × 10k watchers = 100M events/sec. At ~1 KB per event, 100 GB/sec of egress vs ~3.75 GB/sec aggregate network capacity. 25x over budget.
Real fix. The apiserver does watch aggregation: one etcd watch per resource type, then fanout to thousands of apiserver clients. Etcd sees O(resource_types) watches, not O(clients). Same pattern appears in Consul (agent fleet shields the Raft cluster) and Kafka (KRaft controller talks to brokers; brokers talk to clients).
Lesson: the coordination service is not the fan-out engine.
Problem 3 — Split-brain prevention under partition
Naïve fix. "Let both sides keep serving."
Illustration — service mesh control plane during a rack switch failure. Consul cluster across racks; top-of-rack switch fails, isolating the leader. If the isolated leader keeps acking writes, on heal the divergent histories corrupt service registrations.
Walk through: leader A at term 15, commitIndex=1000; partition isolates A; A keeps receiving writes, cannot get majority. If a buggy impl "optimistically" acks: A's log has 1001-1050 optimistic, B+C elect new leader at term 16, B's log has 1051-1100 committed. On heal, A sees term 16, steps down, truncates 1001-1050. Clients who saw 1001-1050 acked are wrong.
Real fix: minority side refuses writes. After ~3 election timeouts without majority, even lease-based linearizable reads stop. CP under partition (Consistency over Availability). Being wrong is worse than being slow.
Problem 4 — Scale limits past 8 GB
Naïve fix. "Increase the data quota." Every problem gets worse: snapshot transfer (16 GB / 1 Gbps = 128 s), MVCC index outgrows RAM, compaction stalls.
Illustration — Kafka pre-KRaft at LinkedIn. ZooKeeper znode count crossed several hundred thousand; controller failover took minutes. Same shape of problem in a different system.
Real fix — topology change: identify what's growing; move it out (events, large CRDs, secrets that shouldn't be there); if genuinely massive metadata, switch to embedded sharded consensus (CockroachDB / TiKV pattern).
Problem 5 — Cross-region coordination
Naïve fix. "Stretch the etcd cluster across regions — 5 nodes, 3 in A, 2 in B."
Illustration — global service discovery. Leader in A; quorum = 3; if both A-followers are momentarily slow, every commit waits for a B-follower at 30+ ms RTT. p99 much worse. Partition between regions: B (2 nodes) cannot form quorum → read-only and stale.
Real fix: don't. Coordination is single-region. Each region has its own coordination service. For cross-region awareness, replicate state out-of-band (Kafka topic, CDC, eventually consistent).
The point: Raft is not a multi-region protocol. Spanner / CockroachDB / YugabyteDB cheat by running many Raft groups (one per shard) placed regionally — they don't run one Raft group across regions.
Problem 6 — Disk corruption / single-node data loss
Naïve fix. Restart and catch up via Raft log.
Walk through: Node B's SSD develops a bit flip in BBolt's meta page. WAL intact, BBolt unreadable. B refuses to start. Cluster is 2 of 3 — one more failure and it's down.
Real fix: etcdctl member remove B, wipe data dir, etcdctl member add B-prime, start with --initial-cluster-state=existing. B-prime receives full snapshot, catches up via log replay, rejoins. Plus backups: etcdctl snapshot save produces point-in-time backups; RPO = backup cadence.
§16. Leader election storms
A subtle failure mode that deserves its own section.
16.1 The storm pattern
A network blip disconnects the leader from followers for ~10 seconds. Followers' election timers fire; term advances:
t=0 : leader A at term 15.
t=5s : partition; A isolated.
t=5.3s: B's election timer fires; → candidate at term 16.
t=5.4s: C's election timer fires; split vote at term 16.
t=5.6s: B and C retry at term 17 (new randomized timeouts).
t=5.7s: B wins at term 17.
t=10s : network heals; A sees term 17, steps down.
In a well-tuned cluster the storm completes in 500-800 ms and is barely noticed.
16.2 When the storm doesn't stop
The pathology: clients all reconnect at once after the partition heals; the flood of reconnect-triggered writes saturates the new leader's WAL fsync; the new leader misses heartbeats; another election fires. The term counter rapidly climbs; every reconnecting client gets ErrNotLeader.
Symptoms: etcd_server_leader_changes_seen_total jumps from 0 to dozens per minute. etcd_server_proposals_failed_total rises. Apiserver logs flood with "etcd member down."
16.3 Mitigations
- Jittered reconnect backoff at the client (the official etcd client library does this; many homemade clients do not).
- Raft PreVote — before incrementing its term, a candidate asks "would you vote for me at the next term?" If a majority say no because they already have a leader, the candidate does not advance its term. Stops the "isolated node returns, term jumps, current leader steps down" pattern.
- Election timeout tuning. Default is 150-300 ms; under high RTT or load, extend to 500-1000 ms.
- Monitor term advance. Healthy clusters advance the term a few times per week; sudden term increase is a red flag. Alert on
etcd_server_leader_changes_seen_total > 5/hour,raft_termadvance faster than 1 per 24h average.
16.4 Recovery from an in-progress storm
- Stop the input load (pause controllers, drain the watch firehose).
- Restart the slow node — often one node is the source.
- Emergency-increase election timeout, then revert.
- Keep PreVote on (don't disable it).
The discipline: stop the runaway feedback first, then let the system stabilize.
§17. Failure modes
Leader crashes mid-write
State: A is leader at term 15, has appended entry 5000 to WAL, sent AppendEntries. B fsync'd and acked. A crashes before responding to client.
Recovery: B and C detect missing heartbeat; election timeout; B becomes candidate. B has entry 5000; B's lastLogIndex=5000. B wins election at term 16. B propagates entry 5000 to C (if needed), commits at term 16. When A reboots, sees term 16, steps down.
Durability point: B's fsync at step 3. Once a follower has fsync'd, the entry survives leader death — Raft's log completeness rule guarantees the next leader has all committed entries.
Minority partition
State: 5-node cluster, A is leader. Partition isolates A+B from C+D+E.
A's side (minority): only 2 of 5 — A cannot commit. Reads (if served by A) continue with stale data until lease expires (~10 s default), then stop.
C+D+E (majority): election, new leader at term 16, accepts writes.
Heal: A sees term 16, steps down, truncates any uncommitted entries past divergence, catches up.
Durability point: nothing was committed on A's side. Uncommitted entries are safe to discard because they were never acked.
Slow disk on a single node
State: B's disk degrades — fsyncs take 200 ms instead of 1 ms. Leader A sends AppendEntries to B and C; C fsyncs in 1 ms, A has majority, commits. B is permanently lagging; matchIndex[B] falls behind. Cluster works but no longer tolerates further failures.
Detection: etcd_disk_backend_commit_duration_seconds, etcd_network_peer_send_failures_total. Mitigation: replace B's disk; rejoin as fresh member.
Snapshot transfer overload
State: B offline a long time. Leader's log has been compacted past B's nextIndex. Leader must send a snapshot — full BBolt file (~8 GB) over the network. During transfer, B is not contributing to quorum. If B is one of 3, you are operating with 2-of-3, tolerating 0 failures.
Mitigation: keep --snapshot-count low enough that nodes don't fall too far behind. Don't let nodes stay offline for long. For very large state, replace members one at a time, never in parallel.
Permanent participant loss
State: lose 2 of 5 permanently. Quorum now 3 of 3 — one more failure brings the cluster down.
Recovery: replace lost members one at a time. For each: member remove, then member add, then start replacement with --initial-cluster-state=existing. Each replacement triggers snapshot transfer. Never parallelize — that risks data divergence; this is what Raft's joint-consensus membership change protocol was designed to prevent.
Slow follower — lag grows until crash
Node B's Network Interface Card (NIC) drops 20% of packets. Writes still succeed because A and C form a majority of 3 — B is technically a voter but isn't contributing. Over hours B's matchIndex falls thousands of entries behind. Eventually B's nextIndex steps back into the log-compaction range, forcing the leader to send a full snapshot — during transfer B is unavailable.
Detection: etcd_network_peer_round_trip_time_seconds rising for one peer; gap between matchIndex[B] and commitIndex exceeding ~10,000 entries.
Mitigation: replace B's NIC; if cluster is already operating degraded, evict and re-add.
Boot loop after disk fill
--quota-backend-bytes set to 8 GB; cluster hits alarm:NOSPACE. An operator wipes one node's data directory hoping to "start fresh." The node receives a snapshot, but the snapshot is 8 GB and the node's quota is also 8 GB. The snapshot install immediately re-triggers NOSPACE; the node panics, restarts, hits the alarm again. Boot loop.
Recovery procedure for "etcd full" must include compact+defrag before re-adding nodes, or raising the quota first. Treat "etcd full" as planned maintenance, not a quick fix.
The "no quorum agrees on a leader" deadlock under partial partition
A 5-node cluster has a partial partition: A-B see each other, C-D see each other, E sees A and D, but A-B can't see C-D. Every node tries to elect itself, and every "majority" is contingent on E's vote, which is split across overlapping but distinct majorities. Term advances rapidly; no leader stabilizes.
Detection: raft_term advancing rapidly while etcd_server_has_leader flapping between 0 and 1 across nodes.
Mitigation: this is a network-layer problem. Fix the partition; PreVote and election-timeout tuning reduce churn but cannot resolve it. The cluster recovers automatically once any 3 nodes form a connected component.
§18. Observability
18.1 Four golden signals for a coordination service
| Signal | etcd metric | Threshold |
|---|---|---|
| Leader presence | etcd_server_has_leader |
Alert if 0 on any node > 30 s |
| Commit latency | etcd_disk_wal_fsync_duration_seconds p99 |
Alert if > 100 ms |
| Backend latency | etcd_disk_backend_commit_duration_seconds p99 |
Alert if > 250 ms |
| Proposal pressure | etcd_server_proposals_pending |
Alert if > 100 sustained |
18.2 The "fsync took 1 second" alert predicts disk death
etcd_disk_wal_fsync_duration_seconds is the most predictive metric for SSD failure. Healthy NVMe gives p99 fsync of 1-5 ms. As an SSD ages and its remapping tables fill, latencies become bimodal — most fsyncs at 1 ms, occasional ones at 1+ second. The bimodality precedes outright failure by hours to days.
- alert: EtcdSlowFsync
expr: histogram_quantile(0.99, rate(etcd_disk_wal_fsync_duration_seconds_bucket[5m])) > 0.1
for: 5m
When this fires, proactively replace the disk. Routine maintenance on a healthy cluster, vs an incident once the cluster has dropped to 2-of-3 from a hard failure.
18.3 Other essential metrics
etcd_server_proposals_failed_total: should be near zero; rise = storm or quota exceeded.etcd_server_leader_changes_seen_total: handful per week; rise = storm (§16).etcd_mvcc_db_total_size_in_bytes: BBolt size; alert at 6 GB.etcd_mvcc_db_total_size_in_use_in_bytes: in-use vs total; large gap = run defrag.etcd_network_peer_round_trip_time_seconds: per-peer RTT; identifies slow followers.etcd_debugging_mvcc_keys_total: should be < ~1M.grpc_server_handled_total{grpc_code=...}: API success/timeout rates.
18.4 Kubernetes-specific
Additional apiserver metrics: apiserver_storage_objects{resource} (per-resource counts; spikes precede etcd capacity issues), apiserver_watch_events_sizes (large events > 1 MB are an anti-pattern), apiserver_request_duration_seconds (slow etcd shows up here first).
Most "Kubernetes is slow" incidents trace to etcd; apiserver metrics are the symptom, etcd metrics are the cause.
§19. Why not Redis SETNX for distributed locks
A junior engineer says: "I'll just use Redis SETNX for distributed locks. Sub-millisecond latency, already deployed." Let's walk through why this is wrong.
Failure 1 — GC pause / scheduling delay breaks lease semantics
# Naïve Redis lock
if redis.set("lock:account:X", worker_id, nx=True, ex=30):
do_critical_section() # transfer $500
redis.delete("lock:account:X")
Worker A acquires at t=0. Java GC pause (or kernel scheduler delay, or hypervisor steal) lasting 35 seconds. Redis key expires at t=30. Worker B SETNX succeeds at t=31, starts its critical section. A wakes up at t=35 still believing it holds the lock; A proceeds with the transfer. Both A and B execute the critical section simultaneously. Account X is debited twice.
Fix: fencing tokens — every acquisition returns a monotonically increasing token; the downstream resource refuses operations with a token less than the most recent it has seen:
token = redis.incr("lock:counter:account:X")
if redis.set("lock:account:X", token, nx=True, ex=30):
db.transfer(amount=500, fencing_token=token)
# DB checks: token >= last token seen for this account
Failure 2 — Redis failover loses the counter
Redis is typically run with async replication primary→replica.
- Worker A:
incr counter→ 42. Acquires lock with token 42. - Primary crashes before replicating the increment.
- Replica promoted. Its counter is still at 41.
- Worker B:
incr counter→ 42 (again!). Acquires lock with token 42. - Two workers, both with fencing token 42. The DB's "must be >= last seen" check doesn't help — both have the same token.
The core failure: Redis's replication is not consensus. Async means the post-failover state is not a consistent prefix of the pre-failover state. Counters can go backward. Locks can be granted twice.
Failure 3 — Redlock and the Kleppmann critique
Antirez proposed Redlock — acquire the lock on a majority of N independent Redis nodes, with strict time bounds. Martin Kleppmann's 2016 critique:
- Redlock relies on bounded clock drift. NTP sync, hypervisor pause, leap second — safety fails.
- Redlock cannot provide monotonic fencing tokens unique across failures; each Redis node has its own counter and they don't agree.
- A GC pause on the client still breaks Redlock because expiration is wall-clock based.
Why coordination services get this right
- Every operation goes through consensus. Lock acquisition is a Raft log entry, ordered globally, durable on majority.
- Fencing tokens come from the Raft commit index (or etcd's revision, or ZooKeeper's zxid). Monotonically increasing across all failures. Even on leader change, the new leader's commit index continues where the old one left off.
- Lease semantics tied to consensus. Expiry is determined by the coordination service's view of client liveness, not by the client's local clock.
- Linearizability. The lock either holds or it doesn't.
The answer: "Redis SETNX is a fast single-node primitive. Coordination is a multi-node consensus problem. If 'low-latency lock' is the real requirement, the fix is to reduce contention, not to weaken consistency."
§20. Migration patterns
Coordination services are rarely greenfield; ZooKeeper-to-something migrations are the most common because ZK is the oldest, the JVM footprint is heavy, and one-shot watches have aged poorly.
20.1 ZooKeeper → KRaft (Kafka's transition)
Apache Kafka used ZooKeeper from 2011 to 2022 for cluster metadata. At LinkedIn scale this hit the wall: 200k+ partitions = 200k+ znodes; controller failover required scanning the entire ZK tree, taking minutes; JVM tuning was a major operational burden.
KIP-500 (2019), shipped in Kafka 3.3 (2022), replaced ZK with KRaft — embedded Raft within Kafka itself. Cluster metadata moved into an internal topic __cluster_metadata, backed by a dedicated Raft group on 3 or 5 controller nodes.
Migration phases:
Phase 1 — bridge release. Kafka 2.8 (2021): experimental embedded Raft as an alternate metadata store. The metadata model was rewritten so it could be served from either ZK or KRaft (schema mapping step).
Phase 2 — dual-write. Production clusters in hybrid mode: every write goes to both, ZK remains authoritative. Ran for 6-12 months in mature environments.
Phase 3 — cutover. KRaft becomes authoritative; ZK demoted to shadow. Brief offline operation (minutes).
Phase 4 — ZK removal. Kafka 4.0 (2025) ships with KRaft only.
Lessons:
- Multi-year migration is unavoidable at this scale. Quarter-long flag-flips are not real.
- Dual-write phase is the long pole — where schema impedance, race conditions, and missing semantics are caught.
- Cutover is brief. All the engineering goes into the path, not the event.
20.2 ZooKeeper → etcd (HBase-like systems)
Same shape as Kafka's pattern:
- Schema mapping: hierarchical znodes → flat keys with prefix conventions; sequential znode counters → etcd's revision.
- Dual-write: every coordination event writes to both; reads still from ZK.
- Shadow verification: a verifier reads etcd, compares to ZK, alerts on divergence.
- Cutover: reads switch to etcd; writes dual for a period; eventually ZK writes stop.
Trickiest: ephemeral nodes vs leases. ZK's session-tied ephemeral semantics are not 1:1 with etcd's lease-tied keys (sessions on TCP connection vs renewable lease over any connection). Shadow phase surfaces these subtle TTL semantic mismatches.
20.3 Legacy ZooKeeper clusters in production
Most production ZK clusters are not actively migrated; they sit and are inherited. Operational rules: pin the JVM version and tune heap (ZK is JVM-sensitive); monitor outstanding_requests and latency_avg; snapshot via txnlog + snapshot; plan the migration but don't rush it.
The framing: "every successful coordination-service migration is multi-year and proceeds in dual-write phases; if anyone proposes a flag-flip cutover, the right response is 'show me the rollback plan.'"
§21. When coordination service is the wrong tool
Most production failures with coordination services are not bugs — they are misapplications. Three canonical anti-patterns.
21.1 "Config that 10k pods read" — wrong because of watch fanout
10,000 pods watching a config key means 10,000 watch streams (past etcd's ~10k ceiling), and every config change emits 10,000 events in a single Raft commit (the thundering herd from §7).
What to use instead:
- Consul KV with the agent fleet (per-host agent caches the config; pods talk to localhost; agents watch Consul).
- A config server + cache: a dedicated service reads etcd (or a database), serves pods via HTTP long-polling or Server-Sent Events (SSE); etcd sees one watch.
- Pull-based config: ConfigMaps via the apiserver, LaunchDarkly-style flag servers — pods pull every few seconds with conditional GETs.
The pattern: coordination service holds source of truth; distribution to many readers is a separate concern handled by an aggregator.
21.2 "Transactional state" — wrong because etcd is not a database
ZK's multi and etcd's Txn give atomic batched operations, but not a full SQL transactional model — no rollback on application failure mid-transaction, no foreign-key integrity, no rich query. ZK and etcd cap at ~8 GB; transactional databases handle terabytes.
What to use instead: a real transactional database (PostgreSQL, CockroachDB, Spanner) for the data, with the coordination service holding metadata about the database (which shard, which node is primary, what is the leader). Per-row conditional writes in DynamoDB or Cassandra Lightweight Transactions (LWT) when only single-row atomicity is needed.
The pattern: coordination service holds metadata about the data plane, not the data plane itself.
21.3 "We put user data in etcd because it was there" — the anti-pattern by name
The temptation: "We have an etcd cluster up for Kubernetes; let's piggyback small per-user metadata on it."
Why wrong: etcd is sized for control-plane data. Per-user records times millions of users blow past 8 GB. Every per-user write contends with control-plane writes; one team's write storm starves Kubernetes' control loop. Sharing the etcd cluster couples deploy blast radius across teams.
What to use instead: a separate database for the use case, or — if the use case genuinely needs coordination — a dedicated etcd cluster. Two 3-node etcd clusters are cheaper than one coupled blast radius.
21.4 The framing
A coordination service is the wrong tool when:
- Many readers, infrequent updates → config server or aggregator with cache.
- Many writers, transactional state → database.
- Large data growing without bound → blob storage or sharded database.
- User data with isolation requirements → dedicated database.
The right uses are narrower than they look: leader election, lock fencing, service registration, cluster membership, small control-plane config. Everything else is a misapplication waiting for the next outage.
§22. Scaling axes
Two distinct axes, different inflection points, different fixes.
Type 1 — uniform growth (more clients, more watches)
Baseline (1x): 1k clients, 100 watches, 1k writes/sec, 100 MB state. Single 3-node cluster. Boring happy place.
10x (10k clients, 1k watches, 5k writes/sec): pressure is watch fanout, not throughput. Each commit emits hundreds of events. Fix: aggregator layer (apiserver pattern). One aggregator process (or several, horizontally scaled) maintains the etcd watches and re-broadcasts to clients. etcd sees ~10 watches; aggregator handles 10k connections. Topology unchanged at the etcd layer.
100x (100k clients — every kubelet in a giant cluster): aggregator is now the bottleneck. Scale the aggregator horizontally — multiple apiservers, each handling a client slice. They all watch the same etcd. etcd is still 3 nodes. Do not scale etcd horizontally for client load.
Type 2 — state growth (more keys, larger values)
10x (state from 100 MB to 1 GB): pressure is snapshot transfer time, compaction stalls, MVCC index memory. Fix: aggressive compaction (etcdctl compact), identify and prune noisy resource types (Kubernetes events example).
100x (state past 8 GB): structural inflection. Cannot keep growing one cluster. Options:
- Shard at the workload level — multiple coordination clusters, each owning a subset of the keyspace, with aggregator layer doing routing. Kubernetes events-sharding is the lightweight version.
- Replace backend entirely — k3s/k0s use SQLite/MySQL/Postgres via
kine. Loses true consensus semantics; appropriate only for small or edge clusters. - Embedded sharded consensus — CockroachDB / TiKV pattern. Every key range is its own Raft group; thousands of Raft groups run in parallel. Metadata scales to TB+. Cost is enormous engineering complexity.
The interesting thing about scaling a coordination service is that most of the moves are anti-patterns at the consensus layer. You don't scale up; you scale away.
§23. Decision matrix vs adjacent categories
| Need | Coordination service (etcd) | Redis | DB row lock | DNS |
|---|---|---|---|---|
| Distributed lock with fencing | Yes — Raft index = token | No — counter can go backward on failover | Possible via single DB SELECT FOR UPDATE |
No |
| Service discovery — small fleet | Yes — watch on service prefix | Possible but stale on failover | DB + polling | Yes if eventually consistent OK |
| Service discovery — large fleet | Yes if aggregated | No | DB bottleneck | Yes — DNS scales naturally |
| Leader election | Yes — lease + revision | Unsafe (async replication) | Fragile via single-row CAS | No |
| Config distribution | Yes — watch on prefix | No reliable watch | DB + cache + invalidation, complex | No |
| Cluster membership | Yes | Unsafe | Possible | No |
| High-throughput KV | No — 10k/sec ceiling | Yes — 100k/sec single node | Possible but slower | No |
| User data | No — ~8 GB limit | No — RAM limit, async repl | Yes | No |
Thresholds
- < 100 services, eventually consistent OK → DNS is fine.
- 100-10k services, real-time updates → coordination service with aggregator (Consul, etcd).
- Need fencing tokens → coordination service. Redis cannot give monotonic counters across failover.
- Lock without fencing → if you really don't need fencing, Redis SETNX is faster. But pause and ask: do you really not need fencing? Most distributed-lock requirements that look benign actually do.
- Both high throughput and consensus → embedded sharded consensus (CockroachDB style), not "use etcd."
§24. Use case gallery
24.1 Kubernetes API server backing store (etcd)
Every Kubernetes cluster runs a single etcd cluster as the sole persistent store for all cluster state — Pods, Nodes, Services, ConfigMaps, Secrets, CustomResources, leases, controller bookmarks. The kube-apiserver is the only thing that talks to etcd. Controllers reconcile by watching the apiserver, which multiplexes their watches off a small number of etcd watches per resource type. Textbook control-plane design: aggregator (apiserver) in front of a tiny consensus core (etcd).
24.2 Kafka KRaft metadata (embedded Raft)
Kafka 3.3+ embeds its own Raft quorum into controller nodes, replacing the external ZooKeeper ensemble Kafka used from 2011 to 2022. Cluster metadata (broker membership, topic configs, partition assignments) lives in an internal topic (__cluster_metadata) backed by Raft. Controller failover went from minutes to seconds. The lesson: at extreme scale, embedding consensus into the system itself is often better than running an external coordination service — controller state can be loaded by log replay rather than full tree scan.
24.3 HashiCorp Consul + service mesh
Consul bundles a 5-node Raft cluster (KV + service catalog) with a SWIM-based gossip layer for peer liveness. Raft holds authoritative state; gossip handles the high-frequency "is X alive" without burdening Raft. Service mesh proxies (Envoy via Connect) consume the catalog via the local Consul agent, which acts as a per-host aggregator. "Coordination service plus aggregator fleet" in pure form.
24.4 Cassandra cluster topology (gossip + lightweight Paxos)
Cassandra uses gossip for the bulk of cluster awareness (node liveness, range ownership) but relies on embedded Paxos for the few operations that need linearizability — schema changes, lightweight transactions (LWT). LWTs use Paxos directly between Cassandra nodes for compare-and-set semantics. Hybrid: gossip for fast eventual coordination, embedded Paxos for the few strong-consistency operations.
24.5 TiKV / CockroachDB embedded Raft (sharded consensus)
These systems don't use a separate coordination service for user data. Every key range (a "region" in TiKV, a "range" in CockroachDB) is its own Raft group. A cluster runs thousands of Raft groups in parallel, each managing ~64-96 MB and replicated across 3-5 nodes. A separate, smaller coordination service (TiDB's Placement Driver; CockroachDB's internal liveness range) handles cluster-level metadata. The design that scales coordination semantics to petabytes by partitioning the consensus problem itself.
24.6 Distributed cron / scheduled job leader election
A canonical small use: 10 worker nodes agree on which runs the nightly batch. Each tries to acquire a lease on /jobs/nightly-rollup/leader. The winner runs the job; the others poll. If the leader dies mid-job, its lease expires; another node grabs it and re-runs. 20 lines in etcd's client library. Variants everywhere: Airflow scheduler election, Kafka MirrorMaker, any single-active worker pattern.
24.7 Feature flag / config distribution
The feature-flag service holds current values in a coordination service. Application instances watch the flag prefix. When an operator updates a flag, every instance receives the watch event and updates in-process config within milliseconds. Much better than polling a DB every N seconds because (a) propagation is immediate and (b) the watch contract guarantees no missed updates. LaunchDarkly-class systems and many homegrown flag systems use etcd or Consul for this.
§25. Real-world implementations with numbers
Kubernetes (etcd). Every K8s cluster runs a 3- or 5-node etcd. Largest known: Alibaba and ByteDance publicly run clusters past 100,000 nodes; OpenAI training fleets and Google GKE customers in the same range. State 2-6 GB, occasionally 8 GB. Peak write rates 5-10k/sec during mass churn. Watch fanout via the apiserver is the architecturally important layer.
Kafka KRaft (replacing ZooKeeper, 2022). Pre-KRaft, LinkedIn-scale Kafka with 200k+ partitions hit the ZooKeeper wall — hundreds of thousands of znodes, watch fanout from every broker, controller failover in minutes. KRaft replaced this with an embedded Raft quorum and __cluster_metadata topic. Controller failover dropped to single-digit seconds; Kafka now scales past 1M+ partitions per cluster.
Google Chubby (2006, Burrows et al.). 5-node Paxos quorums inside Google datacenters. Substrate for GFS master locking, BigTable tablet ownership, BorgMaster election, and (notoriously) a name service used so widely that "the name service is unavailable" became a Google-internal failure category. Lock hold times in hours — coarse-grained, low contention.
HashiCorp Consul. Service discovery + KV + health checks. 5-node Raft for KV+catalog; SWIM gossip for liveness. WAN federation lets Consul span regions for the catalog (not for strongly-consistent KV across regions). Scales to tens of thousands of services and proxy sidecars.
Apache ZooKeeper (Kafka pre-KRaft, HBase, Hadoop). The original Java-based coordination service. Used by Kafka (pre-3.3), HBase (region assignment), Hadoop YARN (RM HA), Solr (collection metadata). Handles ~10-50k writes/sec. Modern systems migrate away because the JVM operational footprint is heavier than etcd's and the one-shot watch model is harder to reason about than etcd's replayable revision-based watch.
TiKV / TiDB (embedded sharded Raft). Raft group per "region" (~96 MB). A 500-node TiKV cluster runs >100k Raft groups in parallel. The Placement Driver (PD) is a separate 3-node etcd cluster for cluster-level metadata.
CockroachDB. Every range (~64 MB) is its own Raft group; a 100-node cluster runs tens of thousands of Raft groups; at scale, >100k. Internal liveness and system ranges hold cluster-level metadata.
etcd in kine (k3s / k0s). k3s swaps etcd for a SQL backend (SQLite, MySQL, Postgres) via a translation layer called kine. Works for small/edge clusters but loses true consensus semantics — relies on the underlying DB's consistency model. Useful as a reminder that the interface (Kubernetes' KV usage) is separable from the implementation (real Raft vs DB row locks).
§26. Summary
Coordination services are a deliberately tiny, deliberately slow, deliberately CP class of strongly-consistent KV stores — etcd, ZooKeeper, Consul, Chubby — built on a single Raft (or Zab, or Paxos) group with quorum-fsync writes, capped at ~10k writes/sec and ~8 GB of state, designed to hold the control-plane truth that every other distributed component depends on; they are not databases, caches, or queues, but the consensus log that resolves "who's the leader, where do the shards live, what's the config, who holds this lock," with linearizability, fencing tokens that survive failover, and ordered watch streams; you don't scale them up — you scale them away by fronting them with an aggregator for watch fanout, pruning what doesn't belong, and switching to embedded sharded consensus (CockroachDB / TiKV / Kafka KRaft) when one Raft group is no longer enough — because only a real consensus log gives you a counter that never goes backward.