← Back to Backend Fundamental Components

Stream Processing

Contents

A technology reference on stream processing engines as a class — Apache Flink, Kafka Streams, Spark Structured Streaming, Apache Beam, Materialize, RisingWave, Samza. What stream processing IS, the contract it provides, the byte-level mechanics of checkpointing and exactly-once, the design space, the hard problems, and how the same technology serves payments fraud detection, real-time ML features, IoT alerting, product analytics, and CDC (Change Data Capture) pipelines differently.


§1. What stream processing IS

A stream processor is a runtime that executes continuous queries over unbounded sequences of events while maintaining incrementally-updated state. The query never finishes. Each new event mutates internal state, possibly emits derived events, and the result of any conceptual aggregation is always "as of the most recent event we've processed."

Three properties together define the category and distinguish it from neighbors:

  1. The input is conceptually unbounded. There is no "end of data." The job runs for months or years.
  2. The query is incremental. When event e arrives, the engine computes f(state, e) → (new_state, output). It does NOT re-scan all prior events to recompute the result. This is what gives stream processors their per-event O(1) cost vs batch's per-refresh O(N) cost.
  3. State is durable and replayable. The engine carries operator-local state (counters, window buffers, join hash tables, ML model parameters) across events, persists it on a regular cadence, and can replay missed input from a durable log to rebuild that state after a crash.

What it solves: low-latency analytics over arriving events. The reader of the output sees aggregations and derived signals reflecting seconds-old input, sometimes sub-second, instead of waiting for the next batch run. Concretely: a fraud-risk score on a card swipe within 200ms of the swipe; a real-time ML feature ("clicks in the last 5 minutes for user U") refreshed continuously; a derived materialized view ("active sessions per region") with the freshness of a database read.

What stream processing is NOT

  • Not a message queue. Kafka, Pulsar, RabbitMQ carry events. They do not aggregate, window, or hold derived state. A stream processor sits on top of a queue.
  • Not a batch processor running quickly. Spark's daily batch reduces O(all_data) per run. Even cut to 1-minute micro-batches it still re-reads each micro-batch's input file. Stream processors mutate one entry of state per event.
  • Not an OLTP database. Databases serve random point reads and writes from many clients; stream processors execute a fixed query graph and rarely accept random queries (Materialize is the boundary case).
  • Not an OLAP system. Snowflake, BigQuery, ClickHouse run interactive ad-hoc queries against stored data. Stream processors run one query repeatedly on arriving data.
  • Not great at: random ad-hoc queries against history (no, query the warehouse); long-running synchronous request/response (no, the model is async push); cross-system distributed transactions beyond the source-and-single-sink boundary (the 2PC machinery is narrow).

Where it sits in the stack

   producers ──► [Kafka / Kinesis / Pulsar / Pub-Sub]  ◄── durable log
                          │
                          ▼
                   [Stream Processor]    ◄── this doc
                          │
                          ▼
   ┌──────────────────────┼──────────────────────────┐
   ▼                      ▼                          ▼
sinks (Kafka,        derived views          OLAP / DW for history
Postgres, S3,        (Materialize,          (Snowflake, BigQuery,
Redis, downstream)   serving DBs)           Pinot, Druid)

The stream processor consumes events, holds derived state, and emits to one or more sinks. It is the always-on incremental compute layer.


§2. Inherent guarantees — and what you must layer on

A stream processor's contract is narrower than people assume. The phrase "exactly-once" gets thrown around carelessly; understand precisely what is and isn't guaranteed.

What you get by design

  • Exactly-once application semantics for internal state. The engine guarantees that, after recovery, the engine's internal state matches what it would have been had no failure occurred. Counters are not double-incremented. Window aggregates are correct. This is achieved via consistent snapshots (Chandy–Lamport barriers in Flink, Kafka transactions in Kafka Streams, batch-IDs in Spark).
  • Exactly-once end-to-end when the sink cooperates. If the sink supports transactional writes (Kafka transactional producer, JDBC with XA, idempotent upsert against a keyed store), the engine can coordinate a Two-Phase Commit (2PC) such that records become visible exactly once. The 2PC coordinator is typically the stream processor's coordinator (JobManager in Flink) and the prepare/commit are tied to checkpoint barriers.
  • Event-time processing with watermarks. The engine separates event time (timestamp on the record) from processing time (wall clock at the operator) and supplies a watermark abstraction — a monotone-increasing claim "I believe no event with timestamp ≤ W will arrive anymore." This makes window-based aggregations correct under out-of-order arrival.
  • State durability across crashes. State is checkpointed to durable storage (S3, HDFS, GCS) on a configurable cadence. After a crash, the job resumes from the last successful checkpoint with no lost state.
  • Ordering within a key. When events are partitioned by some key and processed by a single operator instance per key, the engine preserves the source's ordering for that key. Different keys may interleave.

What you do NOT get

  • NOT distributed transactions across multiple unrelated systems. "Exactly-once" applies to the engine's state plus one cooperating sink (and any second sink that joins the same 2PC group). A pipeline that writes to Kafka AND to Stripe's payments API AND sends an SMS via Twilio is NOT exactly-once. Those external side effects must be made idempotent (using deterministic identifiers).
  • NOT bounded state for free. RocksDB, Hummock, or any state backend will grow without limit if the application logic does not specify TTLs (Time-To-Live), eviction, or window closure. Stream processors do not magically forget data. This is the #1 cause of streaming-job outages in practice.
  • NOT true exactly-once with arbitrary side effects. Sending a Slack alert, writing to a non-transactional file, calling an external REST endpoint — these are at-least-once by nature. The engine cannot rewind the world.
  • NOT a synchronous request/response contract. The engine is push-based. Clients that need "ask question, get answer" must query a downstream serving store the engine populates, not the engine directly. Materialize bends this rule by offering Postgres-protocol queries against streaming views, at the cost of architectural complexity.
  • NOT low recovery time without effort. Cold restart from S3 checkpoints can take minutes to tens of minutes for large state. Local recovery, region-based restart, and standby task managers all need to be configured.
  • NOT a scheduler for cron-like batch jobs. Use Airflow, Argo Workflows, or Dagster for that.

The cardinal sin: claiming exactly-once end-to-end when the downstream sink is a non-idempotent REST API. Be honest about the contract.


§3. The design space

Stream processors split along five orthogonal axes:

Axis 1: Execution model — micro-batch vs true streaming

  • Micro-batch (Spark Structured Streaming, ksqlDB historical mode): the engine accumulates events for a tiny interval (typically 100ms–1s), then runs a Spark job over that micro-batch. Pro: rich SQL, reuse of Spark batch optimizations, simple semantics. Con: latency floor is the micro-batch interval; you cannot hit sub-100ms p99.
  • True per-event streaming (Flink, Kafka Streams, Samza, Storm, Materialize, RisingWave): each event is processed individually as it arrives. Pro: latency in the 10–200 ms range; backpressure is per-channel and finer-grained. Con: more complex semantics, harder failure modes.

Axis 2: Embedded library vs cluster

  • Embedded library (Kafka Streams): a JAR you include in your microservice. No cluster to operate; horizontal scaling is "deploy more pods of your service." State is stored locally and changelogged back to Kafka. Limited to Kafka-as-source-and-sink, JVM-only, single-cluster operations.
  • Standalone cluster (Flink, Spark Streaming, Samza, Storm): a dedicated cluster of JobManager + TaskManagers (or equivalent). Better fit for cross-source pipelines, large state, sophisticated windowing, polyglot sinks. Higher operational cost.

Axis 3: Declarative SQL vs imperative DSL

  • SQL-first (Materialize, RisingWave, ksqlDB, Flink SQL, Spark SQL on streams): define streaming jobs as SQL views. The optimizer plans the dataflow. Analysts can author jobs. Limited extensibility for custom logic.
  • DSL / API (Flink DataStream, Kafka Streams DSL, Beam SDK, Storm): imperative code (Java, Scala, Python) defining the dataflow. Full extensibility, custom windows, custom state, ML library integration. Steeper learning curve.

Axis 4: General-purpose processor vs materialized-view database

  • General-purpose (Flink, Spark, Kafka Streams, Samza, Storm): you write the job, deploy it, monitor it. It emits to a sink; clients query the sink.
  • Materialized-view streaming database (Materialize, RisingWave, Confluent's KSQL pull queries): you create a streaming SQL view, and clients query the view directly via Postgres protocol. The system maintains incremental view results internally and serves point queries from them with the freshness of a streaming aggregation and the convenience of a database.

Axis 5: Unified batch + stream API vs streaming-only

  • Unified (Apache Beam, Flink Table API, Spark Structured Streaming): write the pipeline once, run it on batch input OR streaming input. Useful when the same business logic is needed for backfills and online.
  • Streaming-only (Storm historically, ksqlDB, Materialize): optimized for streaming, batch backfill is a separate concern (often handled via a batch system).

Comparison table

Engine Model Latency p99 Throughput / cluster State backend Exactly-once strategy SQL? Best for
Apache Flink True streaming 10–200 ms 10M events/sec RocksDB / heap 2PC sink + Chandy–Lamport barriers Flink SQL Complex stateful pipelines, low latency, custom windowing
Kafka Streams True streaming 50–500 ms ~1M events/sec / instance RocksDB + Kafka changelog Kafka transactional offset + state + sink commit None Stateful consumer inside a microservice, Kafka-native
Spark Structured Streaming Micro-batch 200 ms – 2 s 5M events/sec Files + checkpoints (HDFS) Idempotent sinks via batch-ID; experimental continuous mode Spark SQL Hybrid batch + stream, Spark shops, ETL
Apache Beam Unified abstraction; runs on Flink/Spark/Dataflow runners Depends on runner Depends on runner Depends on runner Depends on runner SQL via runners Pipeline portability across runners; Google Dataflow
Apache Samza True streaming 50–200 ms ~5M events/sec RocksDB + Kafka changelog At-least-once + idempotent producer SamzaSQL Local-state-heavy jobs (LinkedIn-style derived datasets)
Materialize True streaming, incremental view maintenance 100 ms – 1 s ~1M events/sec Custom (Timely Dataflow, Persist) View consistency + idempotent recomputation Postgres-protocol SQL SQL analytics on streams; ad-hoc queries on streaming data
RisingWave True streaming, materialized views 100 ms – 1 s ~1M events/sec Hummock (S3-backed LSM) Two-phase commit on Hummock barriers Postgres-protocol SQL Cloud-native streaming SQL on object storage
Apache Storm True streaming (per-record tuples) 50–500 ms ~1M events/sec None native (Trident has state) At-least-once acker-based; Trident adds exactly-once None Legacy; rarely chosen for new builds

The first three (Flink, Kafka Streams, Spark) are the workhorses. Materialize and RisingWave are growing fast for streaming-SQL use cases. Beam is the portability layer used heavily inside Google. Samza powers a lot of LinkedIn but is less common elsewhere.


§4. Byte-level mechanics: state backends, checkpoint barriers, 2PC sinks, event time

This is the section where the engine's internals stop being abstract. We'll use Flink as the depth anchor (it has the most explicit and best-documented mechanism) and note where other engines diverge.

4a. State backends — where state actually lives

Flink (and Samza, Kafka Streams) all face the same question: where do you put the keyed state for hundreds of millions of distinct keys, each with multiple values?

┌────────────────────────────────────────────────────────────────┐
│ HashMapStateBackend (formerly MemoryStateBackend)              │
│  • State lives in JVM heap as Java HashMap                     │
│  • Checkpoints serialize the whole heap state                  │
│  • Use case: state < 100 MB per slot, latency critical         │
│  • Limit: ~1 GB before OOM; checkpoints block under pressure   │
└────────────────────────────────────────────────────────────────┘
┌────────────────────────────────────────────────────────────────┐
│ EmbeddedRocksDBStateBackend  ◄── default for large state       │
│  • State is RocksDB on local NVMe, off-heap                    │
│  • LSM tree: writes → memtable → flush → L0 → ... → L6 SSTs    │
│  • Checkpoints upload SST files (incremental) to S3/HDFS       │
│  • State size limited by local SSD, not heap                   │
│  • get/put cost ~10–100 µs (block cache hit) to ~1 ms (miss)   │
└────────────────────────────────────────────────────────────────┘

Why LSM (Log-Structured Merge) tree, not B+ tree, for streaming state? The dominant access pattern is "read-then-write" on the same key, repeated thousands of times per minute (every event for a popular key triggers get → put). An LSM absorbs writes at memtable speed (~1M ops/sec/core, pure RAM); a B+ tree would need a page-modify + WAL fsync per put, ~10x slower for writes. Reads are amortized by the block cache and bloom filters (typical hit rate >99% for hot keys).

RocksDB write path:
  state.put("user_42|window_12:00", count=7)
        │
        ▼
  [Memtable]   ◄── skip-list in RAM, sorted by key (~64 MB default)
        │   when full → immutable, flush
        ▼
  [L0 SSTs]    ── newest, may overlap key ranges
        │
        ▼ leveled compaction
  [L1, L2, ... L6 SSTs]
        │
        ▼
  Each SST: sorted blocks (~4 KB each), block index, bloom filter
            stored on local NVMe

RocksDB read path:
  state.get("user_42|window_12:00")
    1. check memtable → if hit, return
    2. check immutable memtable
    3. check L0 SSTs (newest first), bloom filter skips most
    4. check L1, L2, ... L6 SSTs (each level: at most one SST owns the key)
    5. read 4 KB block (page cache or disk)
    6. binary search within block, return value

The trade-off you give up vs B+ tree is read amplification. Worst case, a query touches memtable + 1 SST per level × 7 levels = 8 lookups. Bloom filters reduce average touch to ~1.1 SSTs. For OLTP point queries on cold data, B+ tree wins; for streaming workloads (~99% writes, hot key clusters), LSM wins by 5–10x.

Kafka Streams and Samza also use RocksDB. Materialize uses its own Persist layer built on object storage; RisingWave uses Hummock, also an LSM but designed to live entirely on S3 / object storage so that compute and state can scale independently — that's the cloud-native trick. Spark Structured Streaming stores state as Parquet-like files on HDFS, less efficient but simpler.

4b. Chandy–Lamport snapshot algorithm — the deepest concept

The problem: a distributed dataflow with N operators across M machines, each holding mutable state. You want a globally consistent snapshot — a state such that for every event, either the event has been fully processed by every operator it should have touched, OR not yet processed by any. No half-applied events. And you want to do this without stopping the world.

Chandy–Lamport (1985) says: send a special marker (a barrier) along every channel of the dataflow. When an operator receives a barrier on every input channel, it snapshots its state and forwards the barrier on every output. The collection of all per-operator snapshots forms a globally consistent cut.

Flink's implementation:

TIME ──────────────────────────────────────────────────────►

Source:    e1  e2  [BARRIER_N]  e3  e4  [BARRIER_N+1] ...
                          ▲
                          │ JobManager triggered checkpoint N
                          │ by injecting barrier_N at all sources

  When BARRIER_N flows past an operator:
    1. operator finishes processing events that arrived BEFORE the barrier
    2. operator snapshots its state (RocksDB SSTs as they are at this moment)
    3. operator emits BARRIER_N to all downstream outputs
    4. operator resumes processing events arriving AFTER the barrier

  Result: every operator's snapshot is a "consistent cut" — every event
  before barrier_N is fully applied, every event after is not.

4c. Aligned vs unaligned checkpoints

When an operator has multiple input channels (e.g., after a keyBy shuffle, each downstream slot receives input from many upstream slots), what does it do when barrier_N arrives on channel A but not yet on channel B?

Aligned checkpoints (default): buffer events on the channel that has seen the barrier. Wait for the barrier on all channels. Then snapshot, then resume.

Aligned:
  channel A ──e1─e2─B(N)─e3─e4───►   e3,e4 buffered until B(N) arrives on B
  channel B ──e5──────B(N)─e6──►
                       ▲
                       │ snapshot now; then drain buffered e3,e4 + e6

Pro: each operator's input set is unambiguous — snapshot reflects exactly the state after consuming everything before the barrier. Con: if channel B is slow (GC pause, network congestion, hot upstream), channel A stalls. Under sustained backpressure, alignment time can grow into seconds, blowing past the latency SLO (Service Level Objective).

Unaligned checkpoints (Flink 1.11+): as soon as the barrier arrives on the first input, snapshot immediately, capturing in-flight buffer state for the other channels. On restore, those buffered events are replayed before processing resumes.

Unaligned:
  channel A ──e1─e2─B(N)─e3─e4───►
  channel B ──e5──────B(N)──e6──►
                          ▲
                          │ snapshot triggers when EITHER channel sees B(N)
                          │ in-flight e3,e4 (channel A) captured in snapshot

Pro: barrier propagates at the speed of the fastest path; checkpoint duration roughly constant under backpressure. Con: snapshot size grows (you save in-flight buffer state); more CPU and S3 bandwidth.

Rule of thumb: aligned by default. Switch to unaligned when sustained lastCompletedCheckpointDuration / checkpointInterval > 0.7 — typically when backpressure ratio sustained > 0.5.

Kafka Streams takes a fundamentally different approach: instead of barriers, it commits a Kafka transaction that atomically updates source offsets, the state-store changelog topic, AND the output topic. The transaction itself is the snapshot.

Spark Structured Streaming uses batch IDs: each micro-batch has a unique numeric ID. State changes for batch N are written to a directory state/N/ on HDFS. If a crash happens during batch N, the next attempt re-runs batch N from scratch, producing the same state directory deterministically. Sinks check the batch ID before writing to avoid duplicates.

4d. End-to-end exactly-once via Two-Phase Commit (2PC) sinks

Internal exactly-once (checkpoint barriers) gives you: "the engine's state is consistent." But the sink writes externally — Kafka, Postgres, S3. If we just write naively, after a crash we might replay events that already wrote downstream. So end-to-end exactly-once requires the sink to participate in the checkpoint.

The pattern is 2PC sink:

       Source         Operators           Sink
         │                │                │
         │   barrier_N    │   barrier_N    │
         │ ────────────►  │ ────────────►  │
         │                │                │ pre-commit (PREPARE):
         │                │                │   open Kafka txn (or
         │                │                │   begin XA transaction)
         │                │                │   write records, flush
         │                │                │   (NOT visible yet to
         │                │                │    read_committed consumers)
         │                │                │   ack barrier_N to JM
         │                │                │
         │            JobManager: all acks received
         │                │                │   persist ckpt N metadata
         │                │                │   to S3
         │                │                │
         │                │ notifyCheckpointComplete(N)
         │                │  ─────────────►│
         │                │                │ commit:
         │                │                │   Kafka txn commit
         │                │                │   records now visible

Two phases:

  1. Pre-commit (PREPARE): when barrier_N arrives at sink, open a transaction (Kafka transactional producer with stable transactional.id, or JDBC XA prepare, or write to a temp S3 path). Write all records buffered since the last barrier. Flush. Do NOT commit. Ack the barrier to the JobManager.
  2. Commit: after the JobManager confirms checkpoint N is durable in S3 (all operators acked, metadata persisted), it sends notifyCheckpointComplete(N) to the sink. The sink then commits the transaction. Records become visible.

Crash analysis:

  • Before pre-commit: no records written. Source rewinds to checkpoint N-1 offsets, replay produces same records. Exactly-once.
  • During pre-commit: open transaction aborted by coordinator timeout. On recovery, source rewinds, replay. Exactly-once.
  • After pre-commit, before commit: transaction state persisted in checkpoint. On recovery, sink re-discovers the open transaction (by transactional ID) and commits it. Exactly-once.
  • After commit: nothing to do. Recovery proceeds with no replay of these records.

The key invariant: the sink transaction commit is bound to the checkpoint metadata commit. The JobManager is the 2PC coordinator.

Sink classes:

  • Transactional sink: Kafka (transactional producer), MySQL/Postgres with XA, distributed DBs supporting 2PC. Implements TwoPhaseCommitSinkFunction in Flink, or analogous in other engines.
  • Idempotent sink: upsert by primary key (Postgres ON CONFLICT, Cassandra upsert, S3 with deterministic object names based on (key, timestamp)). Allows safe replay; effectively-exactly-once at the visible state.
  • At-least-once sink: anything else — appending to a file, sending an email, calling a REST API. Don't claim exactly-once over these.

4e. Event time and watermarks

Event time is the timestamp on the event payload (e.g., the moment a swipe was attempted, the timestamp on a sensor reading). Processing time is the wall clock at the operator. They diverge because:

  • Producers in different regions deliver events with skew of 0–60 seconds.
  • Network blips delay batches by minutes.
  • Replay (rewinding to old offsets) sends old timestamps through the current wall clock.

Computing "transactions per merchant in the last 5 minutes" using processing time gives wrong answers under any of these. Event-time windows fix this.

A watermark is the engine's heuristic claim: "I believe no event with timestamp ≤ W will arrive on this stream anymore." Watermarks are emitted by sources alongside data, propagated along the dataflow, and combined at operators (operator's watermark = min over input watermarks).

A typical strategy:

WatermarkStrategy
    .<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((e, ts) -> e.getEventTimeMillis());

This says: watermark = max event time seen on this partition − 10 seconds. The 10s budget tolerates out-of-order arrivals up to 10 seconds late.

4f. Window state and one event walked end-to-end

Consider a 5-minute tumbling event-time window, keyed by user_id, used for a product analytics aggregation:

For event-time tumbling window of 5 min, keyed by user_id:

  RocksDB state layout (logical):
    state["user_42|window_12:00_12:05"] = clicks = 3

  On every CLICK event for user_42 at event_time = 12:03:17:
    1. operator computes window: floor(12:03:17 / 5min) = window_12:00_12:05
    2. state.get("user_42|window_12:00_12:05") → 3
    3. state.put("user_42|window_12:00_12:05", 4)
    4. register timer at watermark = 12:05:00 (window close)

  When watermark advances past 12:05:00:
    5. timer fires
    6. window result emitted to sink
    7. window state cleared (TTL or explicit purge)

Walking one event through Flink end-to-end (the gold-standard walkthrough):

Event: user_42 clicks at event_time = 12:03:17.123 UTC.

Step 1: PRODUCE
  App server publishes to Kafka topic clicks.
  Producer hashes "user_42" → partition 42.
  Broker writes to log segment (sequential disk write), replicates to
  ISR (in-sync replicas), returns offset 9,765,432. (~10 ms)

Step 2: FLINK SOURCE
  TaskManager 17 owns partition 42 (parallelism = partition count).
  Source thread polls broker, deserializes event, assigns event_time.
  Updates partition watermark: max_seen = 12:03:17.123, wm = 12:03:07.123.
  Forwards event to keyBy operator.

Step 3: KEYBY SHUFFLE
  keyBy(user_id) hashes "user_42" → bucket 167.
  Bucket 167 lives on TaskManager 31, slot 5.
  Event travels through Flink's Netty-based network stack with
  credit-based flow control. (~5 ms, intra-rack)

Step 4: KEYED PROCESS — STATE READ + MUTATE
  Slot 5 on TM 31 sets thread-local key context = "user_42".
  Determines window: 12:00 – 12:05.
  state.get("user_42|w_12:00-12:05")
    → RocksDB lookup:
       (a) memtable check: miss
       (b) bloom filter on L0 SSTs: key might exist in SST 3
       (c) block index on SST 3: key falls in block 47
       (d) check page cache for block 47: HIT
       (e) binary search in 4 KB block: found, count = 3
  Returns 3.
  state.put("user_42|w_12:00-12:05", 4)
    → RocksDB insert:
       (a) append (key, 4) to memtable skip-list (~5 µs)
       (b) RocksDB's own WAL (Write-Ahead Log) is typically DISABLED
           in Flink because Flink's checkpoint is the durability mechanism
       (c) return
  Operator schedules a timer for window close at watermark = 12:05:00.

Step 5: SINK PRE-COMMIT (next barrier)
  60s later, JobManager injects barrier N+1 at all sources.
  Sources snapshot Kafka offsets, emit barrier downstream.
  Keyed operators snapshot RocksDB (incremental SST upload to S3:
    only the SSTs that have changed since the previous barrier).
  Sink receives barrier, opens a Kafka transaction, writes any
  output records buffered since the last barrier, flushes, acks
  barrier to JobManager.
  JobManager collects all acks, persists ckpt N+1 metadata to S3.
  JobManager sends notifyCheckpointComplete(N+1) to sink.
  Sink commits Kafka transaction. Records visible downstream.

Step 6: DURABILITY POINT
  The event is durable end-to-end once Kafka commits the sink transaction
  AND the checkpoint metadata is in S3. If everything crashes immediately
  after this, recovery from checkpoint N+1 will not replay the event
  (source offsets advanced past it), and any output is already in Kafka.

Most candidates stop at "Flink does checkpointing." Only Staff candidates explain WHY the sink-commit ordering matters and what survives at each step.


§5. Windowing in depth

Windowing is where event-time semantics meet application correctness. A window is "a finite slice of an infinite stream over which an aggregation makes sense." Four window classes exist; choosing among them is one of the highest-leverage modeling decisions in a streaming job.

5.1 Tumbling windows — non-overlapping, fixed-size

Tumbling windows partition time into contiguous, non-overlapping fixed-size buckets. An event belongs to exactly one window. The classic "events per 1-minute bucket" pattern.

event_time axis:
  12:00       12:01       12:02       12:03       12:04
   │           │           │           │           │
   │ ╔═══════╗ │ ╔═══════╗ │ ╔═══════╗ │ ╔═══════╗ │
   │ ║ win 0 ║ │ ║ win 1 ║ │ ║ win 2 ║ │ ║ win 3 ║ │
   │ ╚═══════╝ │ ╚═══════╝ │ ╚═══════╝ │ ╚═══════╝ │
        ▲           ▲           ▲           ▲
        e1,e2       e3          e4,e5,e6    e7

Use case: "transactions per merchant per minute," "active devices per minute," "request rate per endpoint per minute" — anything where the bucketing is uniform and you only need each event counted once.

Mechanics: the window assigner computes window_start = floor(event_time / window_size) * window_size. State is keyed by (user_key, window_start). When the watermark advances past window_start + window_size + allowedLateness, the window closes, result emits, and state is garbage-collected.

Trade-off: boundary effects. An event at 11:59:59.999 lands in window 0; an event at 12:00:00.001 lands in window 1. For metrics that should "smooth across" the boundary (e.g., "rolling rate over the last 60 seconds"), use sliding windows instead.

5.2 Sliding windows — overlapping, slide < window

Sliding windows have fixed size but advance by a smaller step. Each event belongs to multiple windows simultaneously (specifically, window_size / slide_size of them).

window size = 5 min, slide = 1 min:
   12:00              12:05
   │                  │
   ╔══════════════════╗
   ║ win [12:00, 12:05) ║
   ╚══════════════════╝
        ╔══════════════════╗
        ║ win [12:01, 12:06) ║
        ╚══════════════════╝
             ╔══════════════════╗
             ║ win [12:02, 12:07) ║
             ╚══════════════════╝

An event at 12:03:30 belongs to FIVE windows: [11:59,12:04), [12:00,12:05),
[12:01,12:06), [12:02,12:07), [12:03,12:08).

Use case: smooth rolling aggregates — "average latency over the last 5 minutes, updated every minute," "fraud-feature: declines in the last 5 min, refreshed every 1 min."

Cost: state and processing multiply by the overlap factor. A 5-minute window sliding every 1 minute holds 5× the state of a 1-minute tumbling window. A 1-hour window sliding every 1 second is catastrophic (3600× state amplification). Pick the slide as coarse as your freshness budget allows.

Optimization: pane-based incremental aggregation. The engine stores per-minute partial sums; when emitting the 5-minute window result, it sums 5 panes. State grows by O(panes), not O(events).

5.3 Session windows — gap-based, dynamic size

Session windows have no fixed length. They group consecutive events for the same key as long as the gap between successive events is below a threshold. When the gap exceeds the threshold, the session closes.

key = user_42:

events:    e1   e2     e3                    e4   e5
time:     12:00 12:01 12:02                 12:35 12:36
                              gap = 33 min
                            > 30 min threshold

sessions:
  [12:00, 12:02 + 30min] ── session A: contains e1, e2, e3
  [12:35, 12:36 + 30min] ── session B: contains e4, e5

The engine treats each new event as potentially starting a new session, ending the previous one, or extending one. If two existing sessions become "connected" because a late event arrives in the gap between them, they MERGE into one session — a subtle and expensive operation.

Concrete walkthrough — sessionization for user activity tracking:

Scenario: web analytics for an e-commerce site. Session gap = 30 minutes
of inactivity. Per-session features: page views, conversion event present?,
total dwell time, referring source.

T = 12:00:00  user_42 visits home page
  → no existing session for user_42
  → open session S1 = [12:00:00, 12:00:00 + 30min)
  → state["user_42|S1"] = {pages: 1, conv: false, dwell: 0, src: "google"}

T = 12:02:13  user_42 visits product page
  → existing session S1 still open (12:02:13 < 12:30:00)
  → extend S1 to [12:00:00, 12:02:13 + 30min) = [12:00:00, 12:32:13)
  → state["user_42|S1"] = {pages: 2, conv: false, dwell: 133s, src: "google"}

T = 12:25:00  user_42 checks out (conversion!)
  → extend S1 to [12:00:00, 12:55:00)
  → state["user_42|S1"] = {pages: 3, conv: true, dwell: 1500s, src: "google"}

T = 12:55:00  watermark advances past 12:55:00
  → S1 timer fires
  → emit session result downstream: {user: 42, pages: 3, conv: true,
                                       dwell: 1500s, src: "google",
                                       start: 12:00:00, end: 12:25:00}
  → state["user_42|S1"] cleared (TTL)

T = 13:30:00  user_42 returns
  → no open session
  → open session S2 = [13:30:00, 14:00:00)
  → state["user_42|S2"] = {pages: 1, ...}

  ── LATE EVENT TWIST ──

T = 13:35:00  a late event arrives for user_42 with event_time = 12:40:00
  → 12:40:00 falls in the gap between (S1 end + 30min) = 12:55:00 and
    S2 start = 13:30:00 — wait, no, it falls within the 30-min extension
    range of the original S1 if the gap hadn't closed.
  → Watermark already advanced past 12:55:00, so S1 was closed.
  → Goes to side-output for late-event reconciliation, OR if
    allowedLateness is configured, the engine recreates S1 state from
    the closed-but-retained window and merges.

State characteristics: session windows are the highest-state-cost window type because each session can have unbounded duration in principle. Bound max session duration explicitly (window.maxSize(24h)) or risk state explosion from bot keys with neverending sessions.

5.4 Global windows + custom triggers

Global windows are one-window-per-key with no automatic close. They're used when none of the built-in window types fit and you implement custom firing logic via a Trigger.

The global window:

key = sensor_42:
  [─────────────────────────────────────────────────────────────►
   single global window per key, never closes by itself
   events e1, e2, e3, e4, ... all accumulate here

Custom Trigger fires when:
  • count of events reaches N (count trigger)
  • event-time elapsed reaches T (event-time trigger)
  • a punctuation event arrives (custom predicate)
  • combination of above

Use case: machine-learning feature computation where the "window" is "last 1000 observations" rather than "last 5 minutes." Custom triggers also enable patterns like "fire when budget is exceeded" or "fire when a sentinel event arrives." This is the most flexible and most footgun-prone window type — you own evictor logic (which events to discard from state) and trigger logic (when to fire).

5.5 Watermarks — how the engine decides "no more events before T"

A watermark is the engine's monotone-increasing claim about event-time progress. The semantic is: "I, the source, believe no further events with event_time ≤ W will arrive on this channel." Watermarks travel alongside data records through the dataflow and combine at multi-input operators as output_watermark = min(input_watermarks).

How sources compute watermarks:

Strategy 1: Bounded out-of-orderness
  watermark = max(event_time_seen_so_far) − max_lateness_budget

  Example: forBoundedOutOfOrderness(Duration.ofSeconds(10))
  After seeing event with event_time = 12:03:17, emit watermark = 12:03:07.
  Events with timestamps in (12:03:07, 12:03:17] are still tolerated;
  events with timestamps ≤ 12:03:07 are considered LATE.

Strategy 2: Monotone timestamps (in-order assumption)
  watermark = max(event_time_seen_so_far)
  Tolerates zero out-of-orderness. Faster watermark advance, more late events.

Strategy 3: Punctuated
  emit watermark only on receipt of a specific marker event
  Useful for sources like Kafka topics where the producer injects
  "I've drained partition through timestamp T" markers.

Bounded vs unbounded out-of-orderness:

  • Bounded: the engine assumes some maximum lateness (10s, 60s, 5 min). Watermark advances aggressively; results emit promptly; events later than the budget are LATE.
  • Unbounded: the engine can never declare "no more events before T" with certainty. The engine still emits something (the source heuristic), but the user must accept that results are speculative.

The late-event problem: events that arrive after their window has closed. Three handling strategies:

  1. Drop silently (default for many engines). Aggregations are slightly wrong for old windows; downstream consumers see no signal of the loss.
  2. Allowed lateness + re-emit: allowedLateness(5 min) keeps the closed window's state alive for an extra 5 minutes. Late events update the result; the engine re-emits the updated aggregation (UPSERT semantics for downstream).
  3. Side outputs: route late events to a separate stream (OutputTag in Flink) for offline reconciliation. The primary pipeline emits "good enough live" results; a batch job reconciles with the side output.
SingleOutputStreamOperator<Result> primary = stream
    .keyBy(Event::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(5))
    .sideOutputLateData(lateEventTag)   // very-late events here
    .aggregate(new MyAgg());

DataStream<Event> veryLate = primary.getSideOutput(lateEventTag);
veryLate.addSink(new ReconciliationSink());

The trade-off between "drop silently," "re-emit with allowed lateness," and "side output to reconciliation" is application-dependent. Fraud detection often picks side output (audit trail needed); engagement dashboards pick allowed lateness (consumers expect occasional re-emits); IoT alerting often drops silently (a stale alert is worse than a missed one).


§6. State backend tuning

Section §4a covered what state backends ARE. This section is operational: how to TUNE them so they actually keep up at scale.

6.1 RocksDB column families per state primitive

Flink maps each state primitive (ValueState, MapState, ListState, etc.) to a dedicated RocksDB column family. Column families are physically distinct LSM trees inside one RocksDB instance — they share the WAL but maintain separate memtables, SSTs, and compaction schedules.

Inside one TaskManager slot:
  RocksDB instance "operator_keyed_state"
   ├── column family "cf_value_state__feature_v1"  ◄── ValueState<Double>
   ├── column family "cf_map_state__friend_set"    ◄── MapState<UserId, Boolean>
   ├── column family "cf_list_state__events_buf"   ◄── ListState<Event>
   ├── column family "cf_window_state__w_5min"     ◄── window contents
   └── column family "cf_timer_state__event_time"  ◄── Flink timer service

Why this matters for tuning:

  • Different state primitives have different access patterns. A ValueState used as a per-key counter is read-heavy on hot keys; a MapState used as a friend set is large but accessed less frequently. Separating column families lets the engine tune block cache, write buffer size, and compaction style per state.
  • Hot column families can dominate compaction CPU. If one column family receives 95% of writes, RocksDB's compaction queue is dominated by that family; cold families lag in compaction and accumulate L0 files.
  • Schema evolution operates per column family. Migrating one state's schema does not touch others.

6.2 Incremental checkpoints — only changed SSTables uploaded

The naïve checkpoint serializes the entire state to S3 every interval. For a 100 GB state job with a 60-second checkpoint interval, that's 100 GB / 60s = 1.7 GB/s of upload bandwidth per TaskManager — usually infeasible.

Incremental checkpoints exploit the LSM tree's immutability invariant: once an SST file is written, it never changes. Only new SSTs (created since the last checkpoint) need to be uploaded.

Checkpoint N at T=60s:
  RocksDB SSTs on local NVMe:
    sst-001  ─┐
    sst-002  ─┼── existed at ckpt N-1, already in S3 ─── NOT uploaded
    sst-003  ─┘
    sst-100  ─┐
    sst-101  ─┼── created since last ckpt              ─── UPLOAD
    sst-102  ─┘

  Checkpoint metadata in S3:
    ckpt-N-metadata.json = {
      "sst_files": ["sst-001", "sst-002", "sst-003", "sst-100", "sst-101", "sst-102"],
      "kafka_offsets": {...},
      "pending_2pc_txns": {...}
    }

The SSTs already in S3 are referenced by their content hash; new ones are uploaded. Typical bandwidth reduction: 90–99%.

Gotcha: S3 storage cost grows because old SSTs are retained as long as any checkpoint references them. The engine garbage-collects unreferenced SSTs after a retention threshold. Misconfigured retention can multiply S3 spend by 10×.

6.3 State migration on job upgrade

When the job's code changes (new operator, new state primitive, different keying), the old checkpoint may not be directly compatible. The engine offers tools to migrate state across job versions.

Compatible upgrade: the new job has the same operator UIDs and state schemas. The engine restores state automatically. This is the happy path — pin UIDs explicitly via .uid("operator-name") to make UID stability survive code refactors.

Schema migration: if a state schema changes (e.g., a POJO gains a field), the engine deserializes the old bytes with the new schema using Avro/POJO evolution rules. Adding a nullable field is safe; removing a field is safe (lossy); renaming requires explicit migration.

Topology change migration: if the job's DAG changes (new operator inserted, parallelism changes drastically), the engine offers --allowNonRestoredState (drop unmatched state) and explicit operator-state remapping. This is operator-painful and usually triggers a "savepoint, edit, restore" workflow (see §10 Job Upgrade Strategies).

6.4 Schema evolution of stored state — Avro evolution rules apply

Stream-processor state is durable across job lifetimes. Schema evolution applies the same way it does for any persistent format:

  • Forward-compatible changes: adding optional fields with defaults, adding new enum values (with a "default" fallback). Old code reading new state ignores the new field.
  • Backward-compatible changes: removing fields that the new code doesn't use. New code reading old state ignores the absent field.
  • Breaking changes: renaming fields, changing field types, removing required fields. These require explicit migration logic.

Flink supports Avro-based state with rich evolution and POJO-based state with limited evolution (Java field add/remove). For complex schema changes, write a migration job: read state via state-processor-API, transform, write into a new state in a fresh checkpoint.

6.5 Why RocksDB tuning matters — block cache, compaction style

RocksDB has hundreds of tuning parameters. For streaming workloads, the load-bearing ones:

Block cache size: the in-memory cache of hot SST blocks. A miss costs a disk read (~100 µs SSD); a hit costs a memory lookup (~100 ns). For a hot-key-dominated workload, the block cache hit rate determines per-event latency. Default ~8 MB is usually too small; tune to 20–40% of TaskManager memory.

Compaction style:

  • Leveled compaction (default): classic LSM with strict per-level size ratios. Lower write amplification on writes, lower space amplification, but more compaction CPU on hot updates.
  • Tiered (universal) compaction: less compaction work; higher space amplification (state on disk can be 2–3× actual size). Useful when local SSD is plentiful but CPU is constrained.
  • FIFO compaction: for ephemeral state with TTL — just delete old SSTs without compaction. Fast but not useful for general-purpose keyed state.

Write buffer size (memtable): larger memtables (256 MB vs default 64 MB) reduce L0 flush frequency and write amplification. The trade-off is RAM usage and slower recovery from crashes (more WAL to replay — though Flink typically disables RocksDB WAL because checkpoints are the durability mechanism).

Bloom filter bits per key: 10 bits/key (default) gives ~1% false-positive rate. For workloads where most reads miss (e.g., dedupe by sequence-id with mostly-unique keys), increase to 15–20 bits/key to reduce SST touches.

The tuning-loop: measure with RocksDB statistics (read amplification, write amplification, compaction stalls, block cache hit rate). Adjust one parameter at a time. Retest. Flink exposes RocksDB metrics through metrics.reporter.rocksdb.factory.class configuration.


§7. Backpressure

Backpressure is the engine's mechanism for preventing fast producers from overwhelming slow consumers. It's also the most-misunderstood streaming concept — engineers see "backpressure: 80%" in Flink UI and panic, when in fact backpressure is the normal way streaming engines preserve correctness under load.

7.1 What happens when a sink slows down

Normal flow:
  Source → Op1 → Op2 → Sink → External system
     1M/s   1M/s  1M/s   1M/s     1M/s consumed

Sink slows (e.g., downstream Kafka becomes slow, or external REST API
adds latency):

  Source → Op1 → Op2 → Sink → External system
     1M/s   1M/s  1M/s   500k/s   500k/s consumed
                              ▲
                              └── sink's output buffer fills

Sink's input network buffer (allocated by upstream Op2) fills:

  Op2 → Sink
        [████████████████████]  ◄── buffers full
        Op2's "send" call blocks (credit-based flow control runs out of credits)

Op2 stops sending to Sink:

  Op2 cannot emit → Op2's processing loop blocks waiting on output buffer
  Op2 stops consuming from its input buffers

  Op1 → Op2
        [████████████████████]  ◄── Op2's input buffers fill

  Op1 stops sending to Op2 → Op1 stops consuming from its input

  Source → Op1
           [████████████████████]  ◄── Op1's input buffers fill

  Source stops sending → Source's Kafka consumer poll() loop slows

  Eventually: Kafka consumer lag grows. Source pulls from Kafka at the
  rate the slowest downstream operator can absorb.

The cardinal property: backpressure propagates back from the slowest operator to the source. The source naturally slows its consumption from the durable log. The log buffers the un-consumed data (this is exactly what Kafka retention is for). When the slow operator catches up, backpressure releases and the source resumes full-rate consumption.

This is GOOD. It means: under transient downstream slowness, the source DOES NOT drop data, DOES NOT OOM the engine, DOES NOT silently lose events. It naturally rate-limits at the slowest stage.

7.2 Detecting backpressure

Via Flink UI: the JobGraph view shows backpressure indicators per operator. The metric is sampled: Flink threads periodically dump their stack and check whether they're blocked on LocalBufferPool.requestBuffer() (the call that blocks when output buffers are full). The ratio of samples-blocked to samples-total is the "backpressure ratio."

Flink UI legend:
  OK        backpressure ratio < 10%
  LOW       10% – 50%
  HIGH      > 50%

Per-operator color codes propagate UPSTREAM from the bottleneck.
A HIGH on Op2 means Op2 is being slowed by something downstream (sink
or further operators). A HIGH on Source means the next operator down is
the bottleneck.

Via metrics: - numRecordsInPerSecond and numRecordsOutPerSecond diverging across an operator (in > out for a sustained period) indicates the operator is itself the bottleneck. - outputBufferPoolUsage near 1.0 indicates the output buffers are full — backpressure from below. - inputBufferPoolUsage near 1.0 indicates the input buffers are full — backpressure from above. - idleTimeMs near 0 means the operator is constantly working, which combined with input-buffer-full is the textbook bottleneck signature.

Via Kafka consumer lag: the canonical end-user-visible symptom of sustained backpressure. The source can't keep up; lag grows monotonically. The size of the lag bounds the freshness of output.

7.3 Fix patterns

1. Increase parallelism on the bottleneck operator. If the bottleneck is a CPU-bound transformation, more parallelism gives it more cores. Practical: identify the bottleneck via UI, increase its parallelism (.setParallelism(n)), repartition the input.

2. Fix the slow sink. Most backpressure is caused by sinks. Check downstream: - Kafka cluster health (broker CPU, disk IO, replication lag). - External API rate limits. - Database write lock contention. - Network egress saturation (especially cross-AZ).

The sink fix usually pays for itself; the engine fix is band-aid.

3. Use bigger network buffers. Flink's network stack uses fixed-size buffer pools per channel. If the working set of in-flight data is larger than the buffer pool, threads block frequently. taskmanager.memory.network.fraction (default ~10% of TM memory) and taskmanager.network.memory.buffers-per-channel (default 2) can be increased. Pays off when traffic is bursty rather than sustained-high.

4. Switch to unaligned checkpoints. Under sustained backpressure, aligned checkpoints stall on the slow channel waiting for the barrier. Unaligned checkpoints decouple barrier propagation from event processing — the checkpoint completes even when the operator is stuck on a slow downstream.

5. Backpressure-induced load shedding. For overload that is fundamentally beyond capacity, configurable load shedding (drop a percentage of low-priority events) is the last resort. Most engines don't offer this natively; you implement it at the source or in a filter stage with a sampling decision.


§8. Side outputs and Complex Event Processing (CEP)

Two related advanced features: routing different event classes from a single operator (side outputs), and detecting sequential patterns over streams (CEP — Complex Event Processing).

8.1 Side outputs — routing different event types from one operator

The basic abstraction: a single operator emits to its primary output AND to one or more side outputs, each tagged with an OutputTag. Downstream sub-graphs subscribe to specific tags.

                       primary output ──► main pipeline ──► OLAP sink
ProcessFunction ───┬──►
                   ├──► side output "late events"     ──► reconciliation
                   ├──► side output "malformed"       ──► dead-letter queue
                   └──► side output "high-value-user" ──► priority pipeline

Why side outputs matter:

  • Error handling without breaking the topology. A malformed JSON event would otherwise crash the deserializer. With a side output, the operator catches the parse error and routes the raw bytes to a dead-letter queue for offline inspection. The main pipeline continues.
  • Late-event isolation. Window operators emit "very late" events to a side output for batch reconciliation; the main pipeline keeps emitting near-real-time aggregates.
  • Multi-tenancy without multiple jobs. A single operator can classify events into "free-tier," "paid-tier," "internal" outputs, each handled by a different downstream pipeline with different SLOs (Service Level Objectives).
  • CDC routing. Debezium events with op=INSERT, op=UPDATE, op=DELETE can be routed to separate side outputs for different downstream consumers.
final OutputTag<Event> lateTag = new OutputTag<>("late") {};
final OutputTag<String> errorTag = new OutputTag<>("error") {};

SingleOutputStreamOperator<Result> main = events
    .process(new ProcessFunction<Event, Result>() {
        @Override
        public void processElement(Event e, Context ctx, Collector<Result> out) {
            try {
                if (e.eventTime < ctx.timerService().currentWatermark()) {
                    ctx.output(lateTag, e);
                    return;
                }
                out.collect(transform(e));
            } catch (Exception ex) {
                ctx.output(errorTag, e.rawBytes);
            }
        }
    });

main.getSideOutput(lateTag).addSink(new ReconciliationSink());
main.getSideOutput(errorTag).addSink(new DeadLetterSink());
main.addSink(new MainSink());

8.2 Complex Event Processing (CEP) — pattern matching over streams

CEP is a declarative DSL for detecting sequential or contextual patterns in event streams. The classic example: "user logs in, then 5 failed login attempts within 1 minute, then a withdrawal of more than $10,000."

Flink's CEP library compiles a pattern into an NFA (Non-deterministic Finite Automaton) and runs it per key. State per partial match is held in the engine's keyed state.

Pattern: login → 5×failed_login (within 60s) → large_withdrawal

NFA:
  state_0 ── login_event ──► state_1
  state_1 ── failed_login (count: 1) ──► state_2
  state_2 ── failed_login (count: 2) ──► state_3
  ...
  state_6 ── failed_login (count: 5, within 60s of state_1) ──► state_7
  state_7 ── withdrawal(amount > 10_000) ──► MATCH

  (any event in [state_1, state_7) with elapsed > 60s aborts the match)

Flink CEP API:

Pattern<Event, ?> pattern = Pattern.<Event>begin("login")
    .where(SimpleCondition.of(e -> e.type.equals("LOGIN_SUCCESS")))
    .next("failedLogins")
    .where(SimpleCondition.of(e -> e.type.equals("LOGIN_FAILED")))
    .timesOrMore(5)
    .within(Time.minutes(1))
    .next("withdrawal")
    .where(SimpleCondition.of(e -> e.type.equals("WITHDRAWAL")
                                    && e.amount > 10_000));

PatternStream<Event> patternStream = CEP.pattern(events.keyBy(Event::userId),
                                                  pattern);

DataStream<Alert> alerts = patternStream.select(
    new PatternSelectFunction<Event, Alert>() {
        @Override
        public Alert select(Map<String, List<Event>> match) {
            return new Alert(match.get("login").get(0).userId,
                             match.get("withdrawal").get(0).amount,
                             "POSSIBLE_ACCOUNT_TAKEOVER");
        }
    });

Why CEP belongs in a stream processor: the pattern matcher needs the same primitives — event-time ordering, watermark-aware timeouts, keyed state, exactly-once recovery — as any other streaming operator. Building it externally would re-implement those primitives.

Use cases: account-takeover detection (the example), supply-chain SLA violation ("order placed → not shipped within 48h"), trade surveillance ("large order followed by counter-trade within 5 ms" — but at that latency you need hardware FPGAs, not Flink), customer journey funnels ("page A → page B → conversion within 30 min").

Limitations:

  • State explosion under concurrent matches. If thousands of partial matches per key exist simultaneously, state can grow rapidly. Tune AfterMatchSkipStrategy carefully.
  • Latency vs correctness trade-off. Long pattern timeouts mean the engine holds state until the pattern times out or matches. A 24-hour pattern timeout means 24 hours of held state for every partial match.
  • No counter-factuals. CEP cannot easily express "X happened but Y did not." Patterns are positive-existence; absence-of-event requires custom ProcessFunction logic with timers.

§9. SQL on streams

Stream processors increasingly speak SQL. The motivation: analysts who can write SELECT statements outnumber Scala/Java engineers, and the dataflow optimizations a SQL planner generates beat hand-coded job graphs for most aggregation queries.

Flink SQL is a SQL dialect with streaming extensions. A query like:

SELECT
  user_id,
  TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
  COUNT(*) AS click_count,
  COUNT(DISTINCT page_id) AS unique_pages
FROM clicks
GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE);

compiles to a DataStream job: source → keyBy(user_id) → tumbling 5-min event-time window → aggregator → sink. The optimizer chooses window implementation, state layout, and parallelism.

Streaming-specific extensions:

  • TUMBLE, HOP, SESSION window TVFs (Table-Valued Functions).
  • WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND in DDL (Data Definition Language) — watermark strategy at table creation.
  • MATCH_RECOGNIZE for pattern matching (Flink's SQL equivalent to CEP).
  • INSERT INTO ... SELECT ... to define a continuous job.

Table API is the programmatic equivalent — fluent Java/Scala/Python API that compiles to the same internal representation as SQL.

9.2 ksqlDB on Kafka

ksqlDB is Confluent's SQL layer on top of Kafka Streams. SQL queries run as Kafka Streams topologies in a ksqlDB server cluster. State is in RocksDB + Kafka changelog topics, exactly like raw Kafka Streams.

-- Define a stream from a Kafka topic:
CREATE STREAM clicks (user_id BIGINT, page_id BIGINT, event_time BIGINT)
WITH (KAFKA_TOPIC='clicks', VALUE_FORMAT='JSON',
      TIMESTAMP='event_time');

-- Continuous aggregation:
CREATE TABLE clicks_per_user AS
SELECT user_id, COUNT(*) AS click_count
FROM clicks
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY user_id;

-- The resulting TABLE is a Kafka topic, materialized as a queryable state
-- store. ksqlDB serves pull queries against it via REST.

Strengths: tight Kafka integration, low operational complexity for Kafka shops. Weaknesses: limited to Kafka-as-source-and-sink, less mature optimizer than Flink SQL.

9.3 RisingWave's PostgreSQL-compatible streaming SQL

RisingWave exposes a PostgreSQL wire protocol. Connect with psql, write SQL, the engine runs it as a continuous stream-processing job. Materialized views are first-class — they're computed incrementally as input streams arrive.

CREATE SOURCE clicks (user_id BIGINT, page_id BIGINT, event_time TIMESTAMP)
WITH (connector='kafka', topic='clicks', ...);

CREATE MATERIALIZED VIEW clicks_per_user_5min AS
SELECT user_id,
       window_start,
       window_end,
       COUNT(*) AS click_count
FROM TUMBLE(clicks, event_time, INTERVAL '5' MINUTES)
GROUP BY user_id, window_start, window_end;

-- Query the view directly:
SELECT * FROM clicks_per_user_5min
WHERE user_id = 42
ORDER BY window_start DESC
LIMIT 10;

The view is always up-to-date with sub-second freshness. The query result is served from RisingWave's internal Hummock-backed state, not from re-reading raw data.

9.4 Materialized views over streams

The unifying abstraction: a materialized view is an aggregation expression whose result is maintained incrementally as input changes. Stream processors are essentially materialized-view engines:

Input event log    ─►   Streaming SQL engine    ─►   View result (queryable)
(append-only)          (incremental compute)        (point-lookup latency)
  • Flink SQL with sink to Kafka/Postgres: view result is a derived dataset; consumers query the sink.
  • ksqlDB tables: view result is queryable via REST pull queries against the server.
  • RisingWave / Materialize: view result is queryable via PostgreSQL wire protocol; the engine itself serves as the database for the view.

9.5 Query optimizer differences vs traditional SQL

Streaming SQL optimizers face a different cost model than batch SQL optimizers. Batch optimizers minimize "total work" — total CPU, total IO. Streaming optimizers minimize "per-event work" — incremental cost as a new event arrives.

Differences:

  • No table scans. The input is an unbounded stream; there's no "scan the whole table" plan node. All inputs are subscribed to incrementally.
  • State cost dominates. A join's cost is dominated by the size of the hash table maintained in state, not by per-event lookup cost. The optimizer prefers join orderings that keep state small.
  • Watermark propagation matters. Some operators (windows, aggregations) need watermarks to emit results. The optimizer ensures watermarks flow through every required path.
  • No re-execution. Batch queries can re-execute on failure. Streaming queries continue from checkpoints; the optimizer must produce a job graph compatible with state restore.
  • Incremental aggregation. The optimizer rewrites aggregations to compute partial results per pane (for windows) and combine them, rather than recomputing on every event.

Result: queries that look identical in batch and streaming SQL can have wildly different physical plans. A naïve SELECT COUNT(*) FROM big_table works in batch; the equivalent on a stream requires either bounded windows or an unbounded state (often disallowed by the planner).


§10. Job upgrade strategies

A streaming job runs continuously. Upgrading it — new code, new feature, bug fix — without losing state or duplicating processing is one of the operationally trickiest parts of running streaming infrastructure.

10.1 Savepoints — manually-triggered checkpoint at job stop

A savepoint is a checkpoint with manual trigger and unbounded lifetime. Where ordinary checkpoints are triggered every N seconds and retain the last few, savepoints are explicit "here is a snapshot of my job, save it forever (or until I delete it)."

Upgrade workflow with savepoint:

1. Trigger savepoint:
     flink savepoint <job-id> s3://savepoints/my-job/
   → engine takes a global consistent snapshot, writes to S3, returns path.

2. Stop the job:
     flink cancel <job-id>
   → job stops. State is preserved in the savepoint.

3. Deploy new code, restart from savepoint:
     flink run --fromSavepoint s3://savepoints/my-job/snapshot-abc \
               --allowNonRestoredState \
               my-new-jar.jar
   → new job picks up from the savepoint's source offsets and state.

The --allowNonRestoredState flag lets the new job ignore operator state that the new code no longer needs (e.g., a removed operator). Without it, the engine refuses to start if any state in the savepoint has no destination.

10.2 In-place upgrade — compatible state schema

When the new code's state schemas are compatible with the old (via Avro evolution, POJO add/remove field, etc.), the upgrade is "in-place": stop the job, restart with new code from the savepoint, and the engine deserializes old state with new code transparently.

Compatible changes: - Adding a new operator with no state. - Adding optional fields to a POJO state. - Renaming a class (via @TypeInfo annotations preserving the type identity). - Adding new column families (new state primitives) — they start empty.

In-place is the fast path; aim for it.

10.3 Parallel run — new job alongside old, switch traffic

When the new code is incompatible (different keying, different sink schema, different operator UIDs), the safest approach is parallel run:

T = 0: only old job is running.
       old_job ──► sink (alias=v1)

T = 1: start the new job consuming the same Kafka source from a recent offset.
       old_job ──► sink_v1
       new_job ──► sink_v2     (warming up state via replay)

T = warmup: new_job has caught up; its sink_v2 is current.

T = switch: clients (downstream consumers) switch from sink_v1 to sink_v2.
            For tables: alias swap. For event streams: switch consumer group.

T = post-switch: old_job is decommissioned.
                 old_job ────► sink_v1   (paused / stopped)
                 new_job ──► sink_v2     (live)

Cost: doubled compute during the parallel run. Benefit: zero-downtime, zero-data-loss, full validation of the new job against live traffic before flipping.

This is the standard approach for major version upgrades, key changes, or operator-graph changes. The pattern is sometimes called "blue-green deployment for streaming."

10.4 "We changed the function but state is incompatible" — recovery options

When state is incompatible and parallel run isn't feasible (cost, state too large to re-warm):

Option 1: write a state migration job. Use Flink's State Processor API to read state from the old savepoint, transform it, and write a new savepoint compatible with the new code. Run this offline, then start the new job from the new savepoint.

Option 2: bootstrap from a side source. If the state can be recomputed from a durable source (Kafka with long retention, S3 archive), drop the state and replay from a sufficiently old offset.

Option 3: accept the loss. For some workloads (e.g., a rolling-window aggregation with a 1-hour window), starting with empty state and letting it warm up for an hour is acceptable.

Option 4: dual-key the state during the migration window. Write a transitional version of the code that writes to BOTH the old and new state schemas. Run it long enough that the new state is fully populated. Then deploy a version that only reads the new state.

The "we deserialize wrong bytes" disaster mode is described in §11 (Schema Evolution). The job either crashes immediately (type incompatibility detected) or silently emits garbage (type accidentally matches but semantics differ). The latter is far worse — detect it via schema-fingerprint checks at startup.


§11. Schema evolution of state

State is the long-lived asset of a streaming job. Schema evolution rules determine whether the next code change is a one-line update or a multi-day migration project.

When state is declared with Flink's type serializers (POJOs, Avro schemas, Scala case classes), the engine handles evolution automatically — within the limits of the evolution rules.

Avro evolution rules:

Allowed changes (forward + backward compatible):
  • Add a field with a default value
  • Remove a field that had a default
  • Add a new enum value (with default fallback)
  • Promote int → long, float → double (numeric widening)
  • Rename via aliases (Avro feature)

Disallowed:
  • Change a field's type without aliases
  • Make a previously-optional field required
  • Rename without an alias
  • Change a record's namespace

POJO evolution rules (Flink's POJO type info):

  • Adding a new field: safe; missing-from-old-state fields get the Java default (0, null, etc.).
  • Removing a field: safe; old-state value is read and discarded.
  • Renaming a field: NOT safe by default; requires @TypeInfo mapping.
  • Changing a field's type: NOT safe; requires a migration.

11.2 Raw bytes: you handle

When state is opaque bytes (custom serialization, byte arrays, deeply nested structures with manual serde), the engine does nothing. You write a TypeSerializer with custom upgrade logic and version markers.

Custom serializer schema evolution skeleton:

class MyStateSerializer extends TypeSerializerSnapshot<MyState> {
  int version() { return 3; }  // current writer version

  MyState deserialize(byte[] bytes) {
    int writtenVersion = readInt(bytes, 0);
    if (writtenVersion == 1) {
      return upgradeFromV1ToV3(deserializeV1(bytes));
    } else if (writtenVersion == 2) {
      return upgradeFromV2ToV3(deserializeV2(bytes));
    } else if (writtenVersion == 3) {
      return deserializeV3(bytes);
    } else {
      throw new RuntimeException("Unknown version " + writtenVersion);
    }
  }
}

Cost: high (you write the migration), but full control over the on-disk format.

11.3 Adding fields, removing fields, renaming

Adding fields: the easiest. Old state lacks the field; the deserializer fills it with a default. New events populate it.

Removing fields: safe but lossy. Old state still has the field's bytes; the deserializer skips them.

Renaming fields: unsafe without aliases. Old state has bytes under old_name; new code looks for new_name and gets a default — silently losing the data. Always provide an alias when renaming.

11.4 The "we deserialize wrong bytes" disaster mode

The worst case: the engine deserializes old bytes with a new schema that is type-compatible but semantically different. Example:

Old state schema:
  class UserStats {
    int totalRequests;   // bytes [0..4)
    int errorCount;      // bytes [4..8)
  }

New code (developer reorders fields):
  class UserStats {
    int errorCount;      // bytes [0..4)
    int totalRequests;   // bytes [4..8)
  }

Avro with aliases would catch this (fields are named, not positional). POJO without explicit type-info migration would also catch it (POJO uses field names internally). But a custom byte-level serializer with no version check would happily reinterpret old bytes as the new layout: every job since the deploy emits garbage — "errorCount = old totalRequests, totalRequests = old errorCount."

The garbage propagates downstream. Aggregations are wrong but in a way that may not trigger alarms (the values are reasonable-looking, just incorrect). This is the silent-corruption mode.

Defenses:

  • Always include a schema version marker in serialized state.
  • Use Flink's POJO/Avro type info, not custom serializers, when feasible.
  • Run schema-compatibility checks in CI: deserialize old savepoint test fixtures with the new code; fail the build on type mismatch.
  • For critical state, write a "smoke test" that compares an aggregate over the new state to a known-correct value computed from raw events post-deploy.

§12. Multi-tenancy and isolation

A production Flink/Spark cluster typically runs many jobs. How they share resources determines the noisy-neighbor blast radius.

12.1 Running multiple jobs on shared TaskManagers

The simplest model: one TaskManager has N slots; each slot runs one task. Multiple jobs can place tasks on the same TaskManager — different slots, but shared JVM, shared memory pool, shared CPU.

TaskManager TM-5 (16 GB RAM, 8 slots):
  slot 0: job_fraud_features (parallelism task 0)
  slot 1: job_fraud_features (parallelism task 1)
  slot 2: job_ad_clicks (parallelism task 0)
  slot 3: job_ad_clicks (parallelism task 1)
  slot 4: job_session_aggregator (parallelism task 0)
  slot 5: job_session_aggregator (parallelism task 1)
  slot 6: empty
  slot 7: empty

Pros: efficient resource packing; one TaskManager fleet serves many jobs.

Cons: noisy neighbors. A misbehaving job's GC pause stalls slots in unrelated jobs. A slot leaking memory eventually OOMs the JVM and kills all slots.

12.2 Slot sharing groups

Flink defaults to allowing tasks from different operators in the SAME job to share slots. This is efficient — pipelined stages share a slot and avoid network shuffles. Different jobs can also share slots if explicitly allowed.

Default slot-sharing: tasks from the same job's chained pipeline stages
share a slot to enable in-memory data exchange instead of network.

Slot Sharing Group "default":
  slot 0: source(p=0) → map(p=0) → keyBy → window(p=0) → sink(p=0)
  slot 1: source(p=1) → map(p=1) → keyBy → window(p=1) → sink(p=1)
  ...

Custom slot-sharing groups can isolate operators within a job (e.g., keep a heavy state operator on its own slot to prevent it from starving lighter operators). At the multi-job level, slot-sharing across jobs is usually disabled.

12.3 Resource isolation via separate JobManagers

For strong isolation between jobs (different SLOs, different security domains, different on-call teams), each job gets its own JobManager and TaskManager fleet. The Kubernetes pattern:

Job A:
  JobManager pod (HA: 2 replicas with leader election via k8s lease)
  TaskManager pods (10 replicas with PodAntiAffinity for AZ spread)
  Dedicated namespace, resource quota, NetworkPolicy

Job B:
  JobManager pod
  TaskManager pods
  Separate namespace

This is the "session vs application mode" trade-off. Session mode: one Flink cluster serves many jobs; cheap setup, noisy neighbors. Application mode: one Flink cluster per job; expensive but isolated, idiomatic on Kubernetes.

For Staff-level designs, "application mode per critical job, session mode for low-priority batch-streaming" is a common pattern.

12.4 cgroup limits

When multiple TaskManagers share a physical node, Linux cgroups (control groups) limit CPU and memory per TM. The Kubernetes resource limits (resources.limits.cpu, resources.limits.memory) are implemented via cgroups.

cgroup hierarchy on a node:
  kubelet/
   ├─ pod_taskmanager_jobA_0/
   │   └─ cpu.shares = 4096, memory.limit_in_bytes = 16 GB
   ├─ pod_taskmanager_jobB_0/
   │   └─ cpu.shares = 2048, memory.limit_in_bytes = 8 GB
   └─ ...

CPU contention: cgroups distribute CPU proportional to cpu.shares.
Memory: hitting the memory limit triggers OOMKill, the kernel reaps
the cgroup. Kubernetes restarts the pod.

cgroups give CPU and memory isolation. They do NOT isolate disk IO bandwidth or network bandwidth by default; for those, use blkio and tc (Traffic Control) qdiscs respectively, or run noisy jobs on dedicated nodes.

12.5 The noisy-neighbor problem

The classic symptom: a low-priority job suddenly degrades a critical job's latency. Investigation:

  1. CPU contention: both jobs maxing out cores on the same node. cgroup shares mediate, but a CPU-spinning job hurts the latency-critical job's wakeup time.
  2. Memory pressure: one job grows close to its limit, the JVM does long GC cycles, slot threads stall. If they share the JVM, the other job's tasks stall too.
  3. Disk IO: one job's RocksDB compaction saturates local NVMe. The other job's reads queue.
  4. Network bandwidth: one job's cross-AZ shuffle dominates the VPC's per-AZ bandwidth budget; the other job's source pulls slow down.
  5. Checkpoint storage: both jobs writing to S3 simultaneously; S3 per-prefix throttling kicks in; checkpoint duration grows.

Mitigations: application-mode (dedicated cluster), dedicated nodes (taints/tolerations for critical jobs), per-job S3 path sharding, per-job dedicated Kafka clusters.


§13. Cost economics

A production streaming job's monthly cost is usually dominated by three components: TaskManager compute, checkpoint storage, and network egress. Knowing the relative contributions guides optimization priorities.

13.1 Checkpoint storage — S3 + RocksDB state size × checkpoint frequency

Checkpoint storage cost model:

  cost_per_month = state_size_GB × s3_storage_$_per_GB_month
                   + retained_checkpoints × state_size_GB × s3_storage_$
                   + checkpoint_PUTs_per_month × s3_PUT_$
                   + checkpoint_GETs_per_month × s3_GET_$    (on recovery)

Worked example:
  State size: 1 TB
  Retained checkpoints: 3 (last completed + 2 historical)
  Incremental delta per checkpoint: 50 GB
  Checkpoint interval: 60s → 43,200 checkpoints/month
  S3 standard storage: $0.023/GB/month
  S3 PUT: $0.005 per 1000

  Steady-state storage: 3 × 1 TB = 3 TB → 3000 × $0.023 = $69/month
  Incremental SST upload: 50 GB × 43,200 = 2.16 PB upload/month (bandwidth)
  Per-checkpoint PUT count: ~10 (one per SST) × 43,200 = 432,000 PUTs/month
                            → 432 × $0.005 = $2.16/month (negligible)

Network egress (S3 cross-AZ to TaskManager):
  If checkpoints are in the same region but different AZ:
    50 GB × 43,200 × $0.01/GB cross-AZ = $21,600/month
  If same AZ: free, but loses AZ-failure durability.

Typical: storage cheap, PUTs negligible, NETWORK is the issue.

Mitigations:

  • Reduce state size: TTL on state, more aggressive eviction.
  • Reduce checkpoint frequency: 60s → 5 min. Trade-off: replay window after crash grows. Bound by RPO (Recovery Point Objective).
  • Same-AZ S3: use S3 single-AZ buckets if the durability budget allows (probably not for production).
  • Compression: Flink can compress checkpoint payloads; ~3× reduction for typical SST contents.

13.2 TaskManager fleet cost

TaskManager fleet cost model:

  cost_per_month = num_taskmanagers × instance_cost_per_hour × 730 hours

Worked example:
  60 TaskManagers on m6id.4xlarge (16 vCPU, 64 GB RAM, NVMe local):
    on-demand: ~$1.00/hour × 60 × 730 = $43,800/month
    reserved 1y: ~$0.60/hour × 60 × 730 = $26,280/month
    spot: ~$0.30/hour × 60 × 730 = $13,140/month (but spot-interrupt risk)

  At 1M events/sec sustained, this is ~30k events/sec per TM. CPU
  is usually the bottleneck for stateful operators; RAM hosts the
  block cache.

Mitigations:

  • Right-size instances: more memory if block cache hit rate is low; more CPU if compaction is the bottleneck.
  • Reserved/spot mix: baseline on reserved, burst on spot. Spot interruption handled by checkpoint replay.
  • Reduce parallelism if over-provisioned: measure per-slot utilization; cut excess.
  • Higher-IOPS storage: sometimes faster NVMe per TM reduces TM count (CPU was waiting on storage).

13.3 Network egress for cross-AZ shuffles

Cross-AZ network egress costs ~$0.01/GB on AWS, $0.01/GB on GCP, $0.02/GB on Azure. The unsexy but often-largest line item for high-throughput streaming.

Network egress model:

  Per-event size: 1 KB serialized
  KeyBy shuffle: 100% of records cross the network (worst case)
  At 1M events/sec, shuffle traffic = 1 GB/s = 86.4 TB/day = 2.6 PB/month

  If 30% of shuffle is cross-AZ (random spreading across 3 AZs):
    0.78 PB/month × $0.01/GB = $7,800/month

  Sources reading from Kafka in the same AZ: 0 egress.
  Sources reading from Kafka in a different AZ: 100% of input is egress.

Mitigations:

  • Pre-aggregate at source: combine multiple events for the same key before shuffle. 10× reduction for hot keys.
  • Rack-local Kafka brokers: TaskManagers consume from same-AZ Kafka brokers via client.rack configuration. Kafka returns same-AZ replicas when possible.
  • Single-AZ deployment: if AZ failure isn't a hard constraint (some internal analytics jobs), single-AZ saves a lot. Bad idea for production-critical paths.
  • Smaller event payloads: drop unused fields before publishing to Kafka.

13.4 The "shrink state aggressively" patterns

Since state size drives storage cost, checkpoint duration, and recovery time, shrinking it is one of the highest-leverage optimizations.

TTL on state: every state primitive in Flink supports StateTtlConfig. Set explicit TTL (e.g., 24 hours after last access). Background cleanup compacts away expired entries during normal LSM compaction.

Broadcast state for small lookups: instead of a stateful operator joining against a per-key reference table, broadcast a small table (e.g., 1 GB feature flag config) to all operators. Each operator holds an in-memory copy; lookups are O(1) without keyed state.

Approximate algorithms: HyperLogLog for distinct counts (~2 KB per HLL instance vs O(unique values) for an exact set), Count-Min Sketch for frequency, T-Digest for quantiles. The trade-off: bounded error rate for vast state reduction.

Aggressive window closing: if you don't need allowedLateness, set it to 0. Late events go to side outputs, but window state closes immediately on watermark advance.

Periodic full compaction: schedule manual full RocksDB compactions during low-traffic hours. Reduces tombstone count and stale state.


§14. Unified batch/stream

The dream: one engine, one API, one set of bugs. Three engines/frameworks claim this.

Flink models batch as "a stream that happens to end." A bounded source has a definite final offset; the engine knows when it has consumed all data; aggregations emit final results without waiting for further events.

Streaming: source is unbounded Kafka topic, watermark advances by event_time.
Batch:     source is bounded S3 file, watermark advances to infinity at EOF.

Same DataStream API. Same operators. Same state backend.
Same checkpoint/recovery (or not, if batch mode disables checkpoints).

What changes between batch and streaming modes:

  • Checkpoints: typically disabled in batch mode (replay is from input, not state).
  • Network shuffle: batch can use "blocking" shuffles (intermediate results materialized to disk between stages, enabling fault-tolerant re-execution of failed tasks). Streaming uses "pipelined" shuffles.
  • Scheduling: batch can schedule stages sequentially (stage 1 completes, then stage 2 starts). Streaming runs all stages concurrently.
  • Windowing: batch windows close at end-of-input. Streaming windows close at watermark.

The argument FOR unification: rewriting the same business logic twice (Spark batch + Flink streaming, or dbt + Kafka Streams) is the #1 source of "online/offline feature skew" in ML systems.

14.2 Spark's "structured streaming on batch engine"

Spark goes the other direction: streaming is "batch chopped into micro-batches." A Structured Streaming query runs the same logical SQL as a batch query, with the engine treating each micro-batch as a small Spark job.

Logical query: SELECT user_id, COUNT(*) FROM events GROUP BY user_id

Batch execution: one job over the full input, one COUNT aggregation.
Streaming execution: one job per micro-batch over the new events, plus
                     incremental state maintenance (state in HDFS files).

Strengths: the optimizer, the type system, the dataframe API are identical. A team that knows Spark batch gets Spark streaming nearly for free.

Weakness: the micro-batch model has a latency floor (typically 100ms–1s). For true sub-100ms requirements, Spark Continuous Processing exists but is experimental and limited.

14.3 Apache Beam as portability layer

Beam is not an engine; it's an API. You write a pipeline in Beam SDK (Java, Python, Go), then run it on a Beam runner: Flink runner, Spark runner, Dataflow runner (Google's managed engine), Samza runner, Storm runner.

            Beam SDK (Java / Python / Go)
                       │
            Pipeline → portable representation
                       │
        ┌──────────────┼──────────────┐
        ▼              ▼              ▼
  Flink Runner    Spark Runner   Dataflow Runner
        │              │              │
     Flink jobs   Spark jobs    Google Dataflow
                                     workers

Pros: one codebase runs anywhere. Useful for portability between cloud providers, or for pipelines that need to backfill via Dataflow (Google's preferred batch path) and run online via Flink (preferred streaming path).

Cons: Beam's model is the lowest common denominator across runners. Some Flink-specific optimizations (state-processor-API access, custom window assigners) are not available through Beam. The abstraction has a tax.

14.4 When unification is right vs when batch needs its own engine

Unification is right when:

  • The business logic is genuinely the same between batch and streaming (ML feature computation, derived-dataset materialization).
  • The "online" and "offline" outputs must match exactly (feature parity is a model-quality requirement).
  • The team has more streaming expertise than batch.

Unification is wrong (use separate engines) when:

  • Batch needs complex query optimization (cross-join, large-shuffle aggregations) that streaming optimizers don't handle well.
  • Batch processes much larger inputs (full table scans of multi-TB warehouses); streaming engines are tuned for per-event cost, not per-row cost over enormous bounded inputs.
  • The "batch" is itself a different workload (ad-hoc analyst queries against a data warehouse — that's Snowflake or BigQuery, not a streaming engine).

Practical pattern: use Beam or Flink batch mode for derived-dataset backfills that share logic with the streaming path; use Spark + dbt for warehouse ETL; use stream processing for the live serving path.


§15. CDC (Change Data Capture) into stream processing

CDC is the canonical pattern that turned stream processing from "an exotic tool" to "the modern ETL substrate." The pattern:

   OLTP database (Postgres / MySQL)
         │
         │ database write-ahead log
         │ (binlog, WAL)
         ▼
   ┌───────────────┐
   │   Debezium    │   ◄── Kafka Connect source, reads the DB log,
   │               │      emits change events to Kafka
   └───────┬───────┘
           │
           ▼
   ┌──────────────────────────────┐
   │   Kafka topic: db.public.    │
   │   orders                     │
   │   • op=INSERT/UPDATE/DELETE  │
   │   • before / after snapshots │
   └──────────┬───────────────────┘
              │
              ▼
   ┌──────────────────────────────┐
   │   Flink (or Materialize,     │
   │   RisingWave) consumes CDC   │
   │   stream, joins with other   │
   │   streams or reference data, │
   │   maintains derived views    │
   └──────────┬───────────────────┘
              │
              ▼
   ┌──────────────────────────────┐
   │   Derived store: OLAP        │
   │   (ClickHouse, Pinot),       │
   │   search (Elasticsearch),    │
   │   cache (Redis), other DB    │
   └──────────────────────────────┘

15.1 Why CDC is the modern ETL

Traditional ETL was "every hour, dump the entire OLTP database to a warehouse and process." Problems:

  • Lock contention or replica lag. The dump locks tables or hits a replica with replication lag.
  • Latency floor. Hourly dumps mean hourly freshness.
  • Cost. Dumping the whole DB transfers data that mostly hasn't changed.
  • Schema drift. The dumper has to know the schema; schema changes break it.

CDC inverts this: read the change log (Postgres WAL, MySQL binlog). Only changed rows flow. Latency is seconds. Schema changes propagate as new events. No table locks. The OLTP database operates as normal.

15.2 Snapshots vs incremental

CDC needs to handle two phases:

Snapshot phase: the source connector starts. It needs initial state of every row. Two strategies:

  • Database snapshot at startup: the connector does SELECT * FROM table at startup with a snapshot isolation level, emits each row as an INSERT event. Then it switches to log-streaming mode. Drawback: snapshot can take hours on large tables; connector restarts re-snapshot from scratch unless checkpoint is preserved.
  • Incremental snapshot (Debezium's modern approach): snapshot in chunks while concurrently streaming the log. New writes flow live; the chunked snapshot fills in old data. No long blocking phase.

Incremental phase: the connector reads the log incrementally. Each transaction's changes flow to Kafka as a batch. The connector tracks log offset (LSN in Postgres, binlog file+position in MySQL) and commits the offset to Kafka Connect.

15.3 The catchup mode

A common failure: the connector is down for hours; the database log retains the writes; on restart, the connector has to catch up. Catchup processing rate must exceed the database's incoming write rate, or the connector lags forever.

Mitigations:

  • Log retention longer than worst-case outage. Postgres wal_keep_size, MySQL expire_logs_days. Set generously.
  • Parallel apply. Some connectors can parallelize within a single source by hash-partitioning across primary keys. Order within a primary key is preserved.
  • Catchup-mode budget. Provision more connector instances during catchup; scale down after caught up.

15.4 CDC pipeline integration with stream processing

Flink and friends consume CDC streams as a special source type: events have before and after snapshots plus an operation type (c for create, u for update, d for delete, r for read from snapshot phase).

Flink SQL natively understands CDC streams:

CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY,
    user_id BIGINT,
    amount DECIMAL,
    status VARCHAR
) WITH (
    'connector' = 'kafka',
    'topic' = 'cdc.public.orders',
    'format' = 'debezium-json'
);

-- Join CDC stream with another CDC stream:
CREATE TABLE user_revenue AS
SELECT u.user_id, SUM(o.amount) AS lifetime_revenue
FROM orders o JOIN users u ON o.user_id = u.user_id
GROUP BY u.user_id;
-- ↑ this view updates incrementally as orders/users are inserted,
--   updated, or deleted in the source databases.

15.5 CDC failure modes

  • Schema-evolution mismatches. The source DB ADDs a column; Debezium emits events with the new column; the Flink schema is out of date; Flink crashes or silently ignores the new column. Mitigation: schema registry, automated schema drift detection.
  • Delete handling. A DELETE in the source produces a tombstone event (op=d with null after-state). Downstream stores need delete-aware logic; appending stores (ClickHouse) need explicit collapsing-merge-tree configurations.
  • Source DB primary key changes. Rare but disastrous: the source DB drops a primary key column; CDC events lose the key; downstream upsert breaks.

§16. Materialize and RisingWave as databases

Materialize and RisingWave occupy a unique position: they ARE the streaming engine and the serving database. You write a SQL view; the engine maintains the view incrementally as new events arrive; clients query the view via PostgreSQL wire protocol with low-latency point lookups.

16.1 Streaming SQL that maintains a query result as data arrives

The core abstraction: a materialized view over streams.

CREATE MATERIALIZED VIEW top_sellers_last_hour AS
SELECT seller_id, SUM(amount) AS revenue
FROM orders
WHERE event_time > NOW() - INTERVAL '1' HOUR
GROUP BY seller_id
ORDER BY revenue DESC
LIMIT 10;

-- The view's result is maintained CONTINUOUSLY:
-- - new orders update the aggregation
-- - the 1-hour window slides forward
-- - the TOP 10 list re-evaluates

-- Client query:
SELECT * FROM top_sellers_last_hour;
-- ↑ returns instantly (point lookup against materialized state)
--   freshness: sub-second from the most recent insert

The query "TOP 10 sellers by revenue last hour" is maintained live. The view itself is a queryable relation. Dashboards refresh by polling the view at any rate (every second, every 100ms) and get up-to-date answers.

16.2 PostgreSQL-wire-compatible

Both Materialize and RisingWave speak PostgreSQL's wire protocol. From a client's perspective, they ARE Postgres:

psql -h materialize.example.com -p 6875 -U materialize
materialize=> SELECT * FROM top_sellers_last_hour;
 seller_id | revenue
-----------+---------
       42  | 18420.50
       17  | 14200.00
        9  | 12001.75
       ...

-- Use any existing Postgres client, ORM, BI tool, etc.
-- Looker, Metabase, Tableau, Grafana all connect like normal Postgres.

This is huge for adoption. Analysts and BI tools don't need to learn a new query language or driver. The streaming nature is hidden behind the SQL interface.

Pattern A: Flink + Cassandra/Redis
  Flink job computes derived data, writes to Cassandra.
  Application reads from Cassandra.
  Two systems to operate; two failure modes; lag between Flink output
  and Cassandra read consistency.

Pattern B: Materialize / RisingWave
  Write a SQL view.
  The engine IS the store; queries hit the maintained view.
  One system; one failure mode; consistent reads.

Pick Materialize/RisingWave when:

  • The queries are stable and expressible as SQL views (not arbitrary per-request transformations).
  • Analysts or BI tools need to query the data with SQL.
  • Throughput per cluster is in the 100k–1M events/sec range (above this, custom engineering on Flink scales better).
  • The team values "one system" over "best-in-class component."

Stick with Flink + serving store when:

  • Custom logic that doesn't fit SQL (ML model inference, business-rule engines).
  • Multi-million-events/sec throughput per pipeline.
  • Polyglot sinks (Kafka, S3, REST APIs, internal systems).
  • Need for fine-grained operator control (custom triggers, complex windowing, side outputs).

16.4 Trade-offs

  • Coupling of compute and serving. A heavy query slows down view maintenance and vice versa. Materialize and RisingWave both have features (query result cache, read-replica clusters) to isolate, but it's a real concern.
  • Higher operational cost for SQL. A SQL planner running 10 incremental views over Kafka may use more memory and CPU per event than a hand-coded Flink job doing the same work.
  • Catchup time. Bootstrapping a new view from a long Kafka retention can take hours or days (the view needs to process all historical events to compute the initial state). Snapshotting at the source helps.

The category is still emerging. Three years ago, only Materialize existed in this niche. RisingWave entered with cloud-native storage (Hummock on S3). Confluent's KSQL pull queries are a third entrant. Expect more options.


§17. Observability of stream jobs

A streaming job that doesn't expose its internals is unfixable. Operators, latencies, watermarks, checkpoints, state sizes — all need to be metric-emitted, dashboarded, alerted.

17.1 Operator metrics

The minimum viable set:

Per operator instance (per slot):
  numRecordsIn       — events consumed since start
  numRecordsOut      — events emitted
  numRecordsInPerSecond  — current input rate
  numRecordsOutPerSecond — current output rate
  currentInputWatermark  — most recent watermark on input
  currentOutputWatermark — watermark emitted to downstream
  idleTimeMs         — time spent waiting for input
  busyTimeMs         — time spent processing

The diagnostic patterns:

  • numRecordsIn ≫ numRecordsOut sustained for an operator other than a filter or aggregator means the operator is dropping or skipping events. Investigate filter logic.
  • currentInputWatermark stuck means an upstream source has no progress (idle partition, or watermark strategy bug). Use withIdleness on the source.
  • currentOutputWatermark < currentInputWatermark means the operator is holding back the watermark (window operator hasn't fired). Normal during accumulation; alarming if it persists past expected window size.
  • busyTimeMs / (busyTimeMs + idleTimeMs) > 0.9 sustained means the operator is CPU-bound. Increase parallelism or optimize.

17.2 Backpressure indicator

Flink UI samples thread stacks and counts time spent blocked on output buffers. Per-operator backpressure indicator:

  • OK: < 10% of samples blocked.
  • LOW: 10–50%.
  • HIGH: > 50%.

Color codes propagate upstream from the bottleneck. Read the graph right-to-left (downstream to upstream) to find the operator that's actually slow.

17.3 Checkpoint duration trend

lastCompletedCheckpointDuration, lastCompletedCheckpointSize, numberOfFailedCheckpoints. The diagnostic patterns:

  • Duration trend rising over weeks means state is growing. Audit state size and TTL.
  • Duration / Interval > 0.7 sustained means checkpoints are crowding the interval; switch to unaligned checkpoints or reduce frequency.
  • numberOfFailedCheckpoints > 0 indicates problems: S3 throttling, alignment timeout, sink-prepare failure. Each failure type has a specific cause; check the JobManager log.

17.4 The "this job's p99 latency grew from 100ms to 30s" investigation playbook

Step 1: What changed?
  - Deploys in the last 24h?
  - Source traffic volume increase?
  - Source schema changes?
  - Downstream sink latency?
  - Cluster events (TM restarts, AZ issues)?

Step 2: Where is the time spent?
  - End-to-end latency = time-to-source + per-operator processing time
    + network shuffle + sink commit time.
  - Drill into per-operator processing time via metrics.
  - If one operator's busyTimeMs spiked, look at hot keys or state growth.

Step 3: Is it backpressure or compute?
  - Backpressure on the source → bottleneck downstream. Find the HIGH operator.
  - No backpressure but high latency → genuine per-event compute increase.

Step 4: Is state size the cause?
  - state.numKeys, state.totalSizeBytes per operator → growing?
  - RocksDB compaction stalls? (rocksdb.stall.micros metric)
  - Block cache hit rate (rocksdb.block.cache.hit.ratio)?

Step 5: Is checkpoint impact the cause?
  - Are checkpoint durations > 50% of interval?
  - During checkpoint windows, does latency spike?

Step 6: Is the sink the cause?
  - Sink commit duration metrics (sink.numRecordsCommitted.duration)?
  - Downstream system health (Kafka, DB, REST API)?

Step 7: Is it skew?
  - Per-slot processing time — is one slot 10× others?
  - Per-key throughput — is one key 100× others?

Common causes by frequency:
  1. Hot key skew (60% of cases)
  2. State growth pushed RocksDB into stalls (20%)
  3. Sink slowdown propagating backpressure (10%)
  4. Schema change increased per-event processing cost (5%)
  5. Cluster events (TM churn, AZ issues) (5%)

The investigation always starts with metrics, narrows to the bottleneck operator, then identifies whether the slowness is compute, state, network, or downstream.


§18. Capacity envelope — small to huge

Stream processors span six orders of magnitude in throughput. The same technology family works at each scale, but the right variant and configuration changes dramatically.

Small — operational dashboarding (1k events/sec)

A startup running ksqlDB or a single Kafka Streams instance to drive an internal dashboard. State < 1 GB. Latency < 1 second. Operates on a single VM. No checkpointing concerns; if the job dies, restart from Kafka's last 24 hours of retention.

Examples: a SaaS startup tracking "active dashboards in the last 5 minutes" for billing; a B2B company building a real-time alerting feed on customer-side events.

Mid scale — real-time fraud features (100k events/sec)

A team like Stripe Radar maintaining real-time risk features for card transactions. ~100k card-auth events/sec at peak. State ~50–500 GB. Latency p99 < 200 ms. Flink job with ~20–60 TaskManagers, RocksDB state, S3 checkpoints. Single-job topology.

Stripe Radar publicly cites sub-100ms decision latency on card authorizations, evaluating features built from streams of historical and recent activity. The pipeline mixes Kafka Streams for some feature jobs with custom infrastructure for the more complex parts.

Large scale — real-time ML feature pipelines (1M–10M events/sec)

Uber Michelangelo's real-time feature platform. ~10M events/sec across multiple Flink jobs. Backs fare estimation, ETA prediction, and fraud detection. State in RocksDB; checkpoints to HDFS. ~5,000 TaskManagers across ~200 jobs. State per job ranges from 100 GB to 100 TB.

LinkedIn's real-time derived datasets — notification ranking, feed personalization, anti-abuse — flow through Samza and Flink. Trillions of events per day cumulative across the platform.

Huge — internet-scale aggregation (10M+ events/sec, trillions/day)

Alibaba's Singles' Day (Double Eleven) shopping event pushes 583k payments/sec at peak, with analytics dashboards updating in real time off Flink jobs aggregating across the order stream. Alibaba forked Flink as "Blink," shipped optimizations, then donated them back. Aggregate event volume across Alibaba's streaming platform is in the trillions per day.

Netflix Mantis processes ~10s of millions of events/sec of operational telemetry — log lines, metric updates, system events — for real-time alerting and anomaly detection across the Netflix streaming infrastructure.

Where the next bottleneck appears

Scale Next bottleneck Mitigation
1k/sec None — fits on one box Stay simple
100k/sec Per-operator state in heap Switch to RocksDB
1M/sec Checkpoint upload bandwidth, hot keys Incremental checkpoints, key salting
10M/sec Single-JobManager coordination, S3 prefix throttling Decompose into multiple jobs by feature domain; shard checkpoint paths
100M/sec Kafka cluster throughput, cross-AZ network Multi-cluster federation, regional sharding

The same Flink (or Spark, Kafka Streams) code path serves all these scales with different deployment topologies. That generality is what makes it a class of technology, not a single tool.


§19. Architecture in context — the canonical pattern

The reference topology for any non-trivial streaming job, regardless of engine:

   producers (services, devices, browsers, change-data-capture)
            │
            ▼
   ┌────────────────────────────────────────────────────────┐
   │ Durable event log (Kafka, Kinesis, Pulsar, Pub/Sub)    │
   │  • partitioned by some key                             │
   │  • retention long enough to absorb downstream outages  │
   │  • replication for durability                          │
   └─────────────┬──────────────────────────────────────────┘
                 │ source consumer offsets stored in the
                 │ stream processor's state, NOT in the log's
                 │ own offset commit (so source rewind is
                 │ atomic with checkpoint recovery)
                 ▼
   ┌────────────────────────────────────────────────────────┐
   │ Stream processor cluster                               │
   │ ┌────────────────────────────────────────────────────┐ │
   │ │ Source operators (parallelism = partition count)   │ │
   │ │   emits records + watermarks                       │ │
   │ └─────────────┬──────────────────────────────────────┘ │
   │               │ keyBy → shuffle                        │
   │               ▼                                        │
   │ ┌────────────────────────────────────────────────────┐ │
   │ │ Stateful operators                                 │ │
   │ │   • event-time windows / timers                    │ │
   │ │   • keyed state in local LSM (RocksDB / Hummock)   │ │
   │ │   • per-key processing logic                       │ │
   │ └─────────────┬──────────────────────────────────────┘ │
   │               │                                        │
   │               ▼                                        │
   │ ┌────────────────────────────────────────────────────┐ │
   │ │ Sink operators                                     │ │
   │ │   • 2PC commit coordinated with checkpoint         │ │
   │ │   • OR idempotent upsert with batch-ID             │ │
   │ └─────────────┬──────────────────────────────────────┘ │
   └───────────────┼────────────────────────────────────────┘
                   │
            ┌──────┴───────────┐
            ▼                  ▼
   ┌─────────────────┐  ┌──────────────────────────┐
   │ Downstream sink │  │ Checkpoint store         │
   │   (Kafka, OLTP, │  │   (S3, HDFS, GCS, Azure  │
   │    OLAP, cache, │  │    Blob)                 │
   │    REST)        │  │ - operator state snapshots│
   └─────────────────┘  │ - source offsets         │
                        │ - sink-commit prepare    │
                        └──────────────────────────┘
                                  ▲
                                  │ on TaskManager death,
                                  │ restore from latest chk-N
   ┌──────────────────────────────┴────────────────────────┐
   │ Coordinator (JobManager in Flink, Driver in Spark)    │
   │  HA via ZooKeeper or k8s lease                        │
   │  • barrier injection / micro-batch scheduling         │
   │  • checkpoint coordination                            │
   │  • on failure: relaunch tasks                         │
   └───────────────────────────────────────────────────────┘

What every deep annotation must call out:

  • The source partitions on the same key the stream processor will keyBy on. This makes the keyBy degenerate to a local lookup most of the time — the source already holds only the keys whose hash matches its partition. Cross-partition shuffle is the slow path.
  • Parallelism matches partition count. More parallelism than partitions wastes slots; fewer creates skew because one slot owns multiple partitions.
  • Source offsets are stored in engine state, not the log's offset machinery. Kafka's __consumer_offsets commit is independent of the engine's checkpoint — so on recovery, the two would disagree about where to resume. The engine commits source offsets atomically with its checkpoint.
  • Sink participates in 2PC. The transactional commit and the checkpoint commit are bound together via the coordinator.
  • Coordinator is HA. A single point of failure for checkpoint coordination is a configuration smell.

§20. Hard problems inherent to stream processing

Six problems that come with the technology, illustrated across different domains.

20.1 Exactly-once across sources AND sinks (the canonical one)

Domain illustration: fraud detection. A streaming feature like "declines for this card in the last 5 minutes" cannot double-count a payment after a crash — regulators audit, and double-counting drives false positives that reject good transactions.

Naïve fix: Kafka consumer with auto-offset-commit + write to downstream Kafka.

Failure walkthrough:

T=0: consumer reads offset 100, processes event, writes to downstream Kafka
T=1: consumer reads offset 101, processes, writes
T=2: auto-commit commits offsets [100, 101]
T=3: consumer reads 102, writes downstream
T=4: ── CRASH before next auto-commit ──
T=5: new consumer instance picks up
T=6: reads __consumer_offsets, last committed = 101
T=7: resumes from 102 → reprocesses, writes 102 a SECOND TIME

Downstream sees event 102 twice.

Real fix: stream processor stores source offsets in its own state; bind offset advance to checkpoint barriers + 2PC sink. Source rewinds to checkpoint-N offsets on recovery; sink re-emits or commits its open transaction. Both ends are atomic with the checkpoint.

Kafka Streams achieves the same via a single Kafka transaction spanning offset commit, state-store-topic write, and output topic write.

20.2 Out-of-order events and watermark stalls

Domain illustration: IoT alerting from a sensor fleet. Cloudflare's DDoS detection ingests telemetry from edge servers; Tesla's fleet sends sensor data from vehicles. Connectivity blips and regional buffering mean events arrive out of order, with skew of seconds to minutes. Computing "edge servers reporting >X failed requests per minute" on processing time would falsely alert on the windows that received a backlog burst.

Naïve fix: window by wall-clock at the operator. Close at minute boundaries.

Failure walkthrough: a Tesla region has a 30-second cellular blip. 100k events arrive 35s after they were generated. They bucket into the wrong minute window. Your "vehicles reporting fault X" graph has a phantom spike 30s in the future, an actual count split between two minute windows. A safety-monitoring model trained on minute-aligned features sees garbage.

Real fix: event time + watermarks.

  • Source assigns timestamps from the event payload, not wall clock.
  • Watermark strategy = forBoundedOutOfOrderness(60s) — tolerate 60s late.
  • Window assignment by event-time.
  • allowedLateness(5 min) keeps window state open for an extra 5 minutes after watermark passes, so late events update the result and re-emit.
  • Very late events (>5 min late) go to a side output for offline reconciliation.

Related sub-problem: idle partitions stall the watermark. A source has 256 partitions; one partition has zero traffic (a Tesla region with no vehicles online overnight). That source's watermark never advances. The downstream watermark = min over inputs = stuck. Event-time windows never close. From the operator's perspective, the job goes silent.

Fix: WatermarkStrategy.withIdleness(Duration.ofSeconds(30)) — if a source emits no data for 30s, mark it idle and exclude it from the min computation.

20.3 State size growth (the silent killer)

Domain illustration: sessionization for product analytics. A session window groups events into user sessions, closing after 30 minutes idle. Snowplow and similar pipelines run this pattern across millions of users.

Naïve fix: let state grow. RocksDB handles big data, right?

Failure walkthrough: a botnet attacker creates 10M new user IDs in an hour, generates one event each, then goes silent. Session state holds 10M × 30 min of buffered events plus ~50 features per user. State grows by 100 GB/hour. RocksDB compaction can't keep up. Checkpoints start taking 5 minutes (longer than the 60s interval). Backpressure → Kafka lag → SLO violation. Eventually local SSD fills, RocksDB throws, TaskManager crashes, recovery takes hours.

Real fix (layered):

  1. State TTL (Time-To-Live): every state has StateTtlConfig (e.g., 1h after last access, cleanup in background). This is the floor.
  2. Bounded session windows: cap session duration at 24h max. Anything longer is split.
  3. State backend tuning: leveled compaction with aggressive level multipliers; provision SSDs at 3× expected state size for compaction headroom.
  4. Proactive monitoring: alert on state-size metric at 70% disk usage. Don't wait for compaction to fail.
  5. Bot detection upstream: filter pathological keyspaces at the source.

20.4 Slow checkpoints

Domain illustration: real-time ML feature pipeline at Uber-style scale. A Flink job maintains 150 features across 800M entities. One feature key gets 5% of total traffic (a benchmarking account, or a corporate fleet card, or a celebrity LinkedIn profile). The slot owning that key holds 50× more state than peers. Its checkpoint upload to S3 takes 90 seconds.

Failure walkthrough: checkpoint interval is 60s. JobManager schedules checkpoint N+1 before N completes. Pending checkpoints stack up. Memory in the JobManager grows. Eventually the system goes into degraded mode and stops checkpointing — meaning recovery from a crash would replay hours of events.

Real fix:

  • Incremental checkpoints (RocksDB): ship only changed SSTs since last checkpoint. Reduces upload from full state to delta.
  • Unaligned checkpoints: barrier propagation does not stall on slow input channels.
  • Local recovery: keep a copy of the last checkpoint on local SSD so TaskManager restart doesn't re-download from S3.
  • Address hot-key skew at the keying layer — see §23 Type 2 scaling.
  • Alert on lastCompletedCheckpointDuration / checkpointInterval > 0.7 as a leading indicator.

20.5 Job recovery time

Domain illustration: real-time dashboards (e.g., Pinterest real-time recommendations). A TaskManager dies at 3am. Latest checkpoint is in S3, 60s ago.

Failure walkthrough: Cold restart kicks in. JobManager downloads checkpoint metadata. All 60 TaskManagers download their RocksDB SSTs from S3. At 7 GB per TM, 60 TMs, sustained 200 MB/s/prefix even with sharded paths: ~3.5 minutes. Plus state restoration into RocksDB + rebuilding indexes: another 2 minutes. Then source rewind + replay 60s of events: another minute. Total: ~7 min before steady state. During that time, the real-time pin recommendation feature is unavailable.

Real fix:

  • Local recovery: each TM keeps the last checkpoint's RocksDB files locally. On TM restart on the same machine, recovery skips S3 download. ~30s recovery.
  • Region-based restart strategy: instead of restarting the whole job on a single TM failure, only restart the affected pipeline region.
  • Standby TaskManagers: provision 10% standby capacity that can take over immediately.
  • Adaptive Scheduler: reconfigure parallelism dynamically rather than fixed-DAG restarts.

20.6 Hot keys (state skew, not throughput skew)

Domain illustration: real-time leaderboards in a gaming platform. Compute "top 100 scores per game in the last hour." One game (a viral release) gets 1000× the traffic of others. The slot owning that game's key is pegged at 100% CPU and 50× more state than peers.

Naïve fix: trust the hash function.

Failure walkthrough: hash-partitioning by game_id sends all traffic for game G to one slot. That slot becomes a bottleneck. Backpressure propagates upstream. Total throughput collapses to 1 / (number of hot games) × per-slot capacity.

Real fix: two-stage aggregation with key salting. First stage: split the hot key into N salt buckets (game_G#0, game_G#1, ..., game_G#N-1); each handles 1/N of the traffic. Second stage: re-key without salt and aggregate the N partial results.

Stage 1: hot_key#salt_0, hot_key#salt_1, ..., hot_key#salt_N → partial counters
Stage 2: re-key by base hot_key (no salt) → final aggregator

For counts, sums, and approximate-quantile algorithms, this is a linear-N reduction in stage-2 traffic.


§21. Failure modes — durability points

21.1 TaskManager crash

T=0     TM-17 dies (kernel panic).
T=0+1s  K8s liveness probe fails.
T=0+5s  JobManager loses heartbeat from TM-17.
T=0+10s JobManager declares TM-17 failed. Triggers restart of affected region.
T=0+15s K8s schedules replacement TM-17 on a new node.
T=0+30s New TM boots. JM assigns it the tasks TM-17 had.
T=0+45s Tasks download checkpoint SSTs from S3 (or local SSD if local
        recovery enabled).
T=0+90s Tasks restored. Source consumer rewinds to checkpoint offsets.
T=0+120s Replay through to current head. Steady state.

DURABILITY POINT: the last completed checkpoint in S3. Everything
processed and acked before that checkpoint is durable; everything
after is replayed from Kafka.

INVARIANT PRESERVED: 2PC sink either replays the open transaction
(commits) or aborts (rewinds and re-emits). Either way, exactly-once.

21.2 JobManager (coordinator) crash

T=0     JM-leader dies.
T=0+5s  Standby JMs (HA mode via ZooKeeper or k8s lease) detect failure.
T=0+10s New JM-leader elected.
T=0+15s New JM reads job state from durable store (ZK + S3):
          • job graph
          • last completed checkpoint metadata pointer
          • TaskManager registry
T=0+20s New JM does NOT relaunch TMs (they're still running).
T=0+25s New JM resumes checkpoint coordination. Next barrier scheduled.

Job continues. TMs processed through the JM gap. No data loss because
checkpoints are persisted in S3, not in JM memory.

DURABILITY POINT: ZooKeeper / k8s ConfigMap holding pointer to "latest
completed checkpoint in S3."

21.3 Sink unavailable mid-checkpoint (subtle)

The downstream Kafka cluster goes unreachable while a 2PC sink has pre-committed.

T=0      Barrier_N arrives at sink. Opens Kafka txn 7, writes records,
         flushes, acks barrier to JM.
T=0+5s   JM collects all acks. Writes ckpt-N metadata to S3.
T=0+5s   JM sends notifyCheckpointComplete(N) to sink.
T=0+6s   Sink calls producer.commitTransaction()
         ── Kafka cluster unreachable. Times out after 60s.
T=1m+6s  Sink throws. TaskManager fails.
T=1m+11s JM restarts the task.
T=1m+45s New task instance recovers from ckpt-N.
         Sink's TwoPhaseCommitSinkFunction sees state has a record of
         "pending transaction 7 needs commit."
         Tries commitTransaction() → still fails (Kafka still down).
         Retries with exponential backoff.
T=5m     Kafka recovers.
T=5m+30s Sink commits txn 7 successfully.
         Downstream consumers see the records, exactly once.

DURABILITY POINT: pending-transaction state persisted in checkpoint.
Stable transactional.id across restarts is the trick that lets the new
producer attach to the same Kafka coordinator state.

21.4 Network partition (split-brain)

Scenario: rack A holds JM-leader + 30 TMs; rack B holds 30 TMs +
JM-standbys. Network partitions A from B.

In A:  JM-leader sees 30 TMs reachable, 30 unreachable.
       Declares 30 TMs failed. Tries to restart on the 30 surviving TMs.
       But parallelism config requires 60 TMs. Job goes into "waiting
       for resources" state.

In B:  JM-standbys cannot reach the lease in ZK (assuming ZK has quorum
       on the A side). They cannot elect a new leader. 30 TMs in B are
       running, but cannot ack barriers. After checkpoint timeout
       (5 min default), tasks abort their checkpoints and stall.

DURABILITY POINT: ZooKeeper / k8s lease serves as the split-brain
breaker. Only the side with quorum can elect a leader. The minority
side never has an active JM, so it cannot make forward progress that
conflicts with the majority side's checkpoint chain.

INVARIANT: at most one JM can be making decisions. ZK / lease ensures
this. On partition heal, B-side TMs reconnect to JM in A; any work
they did since the partition that wasn't checkpointed is rolled back.

21.5 Permanent loss (AZ failure)

A whole Availability Zone vanishes. 20 of 60 TMs gone. JM may or may not be on that AZ.

- If JM-leader was in lost AZ: standby elected (assuming cross-AZ
  ZK quorum). Otherwise JM-leader survives.
- 20 TMs need replacement: K8s scheduler provisions in surviving AZs.
  Capacity headroom (e.g., 50% over-provisioning) absorbs the loss.
- New TMs restore from S3 checkpoints (cross-AZ replicated).
- Source offsets in checkpoint determine replay window. Kafka retention
  (e.g., 72h) absorbs any AZ outage shorter than 3 days.

DURABILITY POINT: S3 (cross-AZ replicated) holds checkpoints.
Kafka cluster spans 3 AZs with replication=3 and min.insync.replicas=2,
so it survives one AZ loss without data loss.

INVARIANT: cross-AZ durability of BOTH checkpoints and source log.
Single-AZ for either is a single point of catastrophic loss.

21.6 Checkpoint storage S3 outage (job pauses)

T=0      S3 region experiences an outage (or specific bucket throttles
         heavily for a customer that has exceeded request rate).
T=0+60s  Next scheduled checkpoint.
         TaskManagers begin uploading SST files; PUT requests fail with
         S3 5xx or throttling.
T=0+120s Checkpoint times out (default 10 min).
T=0+10m  JobManager marks the checkpoint as failed.
         If max-concurrent-checkpoints = 1 (default), the next checkpoint
         starts. If S3 is still down, it also fails.
T=0+30m  Multiple failed checkpoints. Default tolerance:
         tolerable-checkpoint-failure-number = 0 → job FAILS immediately
         on first failure (default in older Flink), OR
         the job continues processing without taking new checkpoints.
T=1h     If the job continues processing, the replay window (events
         processed since last successful checkpoint) grows. RPO
         (Recovery Point Objective) is now 1 hour.

DURABILITY POINT: the LAST SUCCESSFUL checkpoint before the S3 outage.
If S3 recovers and the job is still alive, the next checkpoint succeeds.
If the job crashed during the outage, recovery from the old checkpoint
replays an hour of events.

INVARIANT: source log must retain enough history to cover the worst-case
replay window. If Kafka retention is 24h and S3 is down for 25h, replay
loses data.

MITIGATIONS:
- Multi-region S3 (cross-region replication for checkpoints).
- Higher tolerable-checkpoint-failure-number (e.g., 3) to ride through
  brief outages without job failure.
- Alert on consecutive failed checkpoints, not just job failure.
- Monitor S3 PUT error rate per checkpoint path.

21.7 Backpressure causing checkpoint timeouts

Scenario: downstream sink is slow, backpressure propagates upstream,
aligned checkpoints stall waiting for barriers on slow channels.

T=0      Sink rate drops from 1M/sec to 100k/sec (10x slowdown).
T=0+30s  Backpressure builds. Source consumes from Kafka at 100k/sec.
         Kafka lag grows.
T=0+60s  Next aligned checkpoint scheduled.
         Barrier injected at all sources. Propagates downstream at
         the rate of slow channels.
T=0+5m   Checkpoint duration exceeds checkpoint-timeout (default 10 min).
         Some operators have received the barrier on all input channels;
         others are still waiting.
T=0+10m  Checkpoint times out. JobManager aborts.
         All in-flight 2PC pre-commits are aborted. Job continues
         processing but no checkpoint progress.
T=0+11m  Next checkpoint scheduled, same outcome.

DURABILITY POINT: last successful checkpoint (from before backpressure).
Recovery would replay all events processed since then.

DIAGNOSIS:
- Check `lastCompletedCheckpointDuration` metric — rising over time.
- Check per-operator backpressure indicator — find the HIGH operator.
- Check `checkpointAlignmentTime` — high values point to alignment as
  the bottleneck (vs actual snapshot write time).

MITIGATIONS:
- Switch to unaligned checkpoints — barrier propagation does not wait
  for input alignment.
- Fix the slow sink.
- Reduce checkpoint frequency until backpressure resolves.
- Bound checkpoint timeout to fail fast and surface the issue.

21.8 The "skewed key kills one TaskManager" diagnosis

The textbook hot-key problem manifests as ONE TaskManager (or a small subset) showing high CPU, high state size, slow checkpoints, while the rest of the fleet is idle.

Symptoms:
- Per-TM CPU graph: 5% of TMs at 100%, 95% at <20%.
- Per-TM state-size graph: same skew.
- Checkpoint duration dominated by a small number of slow TMs.
- Backpressure indicator HIGH on operators feeding the hot TMs.
- Kafka lag grows on partitions whose keys hash to hot TMs.

Diagnosis playbook:
1. Identify the hot TaskManager(s) via per-TM metrics.
2. Identify the hot operator(s) on those TMs.
3. Dump per-key state size / per-key throughput for the hot operator.
   Flink's State Processor API can read state offline; instrument with
   a custom metric: numRecordsForKey histogram.
4. The top-K keys by traffic are the culprits.
5. Cross-reference with business logic: is one user a fleet account?
   A bot? A celebrity profile? An internal test key that leaked into
   production?

Resolution:
- Filter or special-case the runaway key upstream (e.g., drop events
  with a known bot pattern at the source).
- Apply two-stage aggregation (key salting) as in §20.6.
- Provision the affected partition with a dedicated TaskManager
  (custom partition assignment).
- If the key is "legitimate but extreme" (e.g., the platform's biggest
  customer), give it its own dedicated job with custom tuning.

The hot-key failure mode is the most common cause of streaming job
incidents at scale. Bake hot-key detection into the standard alert set.

§22. Why not cron + batch SQL every minute

The naïve replacement for stream processing is "run a batch query every minute against the OLTP database (or analytical store) over the last N minutes." Five lines of SQL, no Flink, no cluster, no on-call rotation. Why doesn't it work?

9.1 The OLTP database melts (or the OLAP scan is too expensive)

At 1M events/sec write traffic, your operational database is busy with inserts. Now you add a recurring full-scan-grouped-by-non-partition-key query every minute. Whether the data lives in MySQL, Postgres, Cassandra, or even Snowflake, the secondary access pattern fights with the primary one. The DBA pages you within an hour. You move it to a read replica → replica lag spikes → the query reads stale data anyway.

9.2 The "1-minute latency" is a lie

T = 12:03:00 — cron starts
T = 12:03:45 — query returns
T = 12:03:50 — your code processes results
T = 12:03:55 — alerts written

Events at T=12:03:15 are detected at T=12:03:55. 40s latency.
Events at T=12:03:59 are detected at T=12:04:55. 56s latency.

If you promised sub-second freshness, you're missing by two orders of magnitude.

9.3 Result correctness drifts under processing-time windows

The cron query window is processing-time. A 30s ingestion delay (normal) means events at event_time = 12:03:15 might land at 12:03:45. Your cron at 12:03:00 misses them. Your cron at 12:04:00 sees them. Now they're double-counted across successive runs if windows overlap, or missed entirely if windows don't overlap. The same data is wrong in two different ways depending on window scheme.

9.4 No exactly-once recovery

The cron crashes mid-run. Restart picks up at minute boundary, missing the previous minute entirely. You build a fraud blind spot. Add idempotency via run IDs and a state table? Congratulations, you've started reimplementing Flink, badly, without checkpoint coordination.

9.5 Doesn't compose to many features

What about 5 features × 10 windows? Now 50 expensive aggregation queries per minute. The DB is dead before lunch. Stream processors add a feature as one more operator in an existing DAG — incremental cost is one more state entry per event.

Why streaming wins

Streaming inverts the data flow. The DB is never queried for analytics. Reads happen against pre-aggregated state inside the stream processor. The aggregation is incremental: each event updates one counter. O(1) per event, not O(N) per cron run. Window logic is event-time, watermark-bounded, no overlap-double-count. Failure recovery is bounded by checkpoint interval.

The math: cron + batch SQL has O(N) cost per refresh and O(refresh-interval) latency. Stream processing has O(1) cost per event and O(event-processing-latency) result delay. At 100k+ events/sec, only the second one fits in a datacenter.


§23. Scaling axes

Type 1: uniform growth (more events, more keys)

Same workload, scaled by a factor. Add more producers, more partitions, more parallelism.

  • (100k events/sec, 10M keys): 16 TaskManagers, 64 Kafka partitions. Trivial.
  • (200k events/sec, 20M keys): scale parallelism to 128, repartition Kafka to 128. Linear scaling.
  • 10× (1M events/sec, 100M keys): 256 partitions, 256 parallelism. Add incremental checkpoints. State spills to NVMe SSD. Standard.
  • 100× (10M events/sec, 1B keys):
  • Bump to 1024 partitions. Each TM owns 1–2 partitions.
  • Higher-IOPS NVMe (gp3 io2, i4i instances).
  • Shard S3 checkpoint prefixes per operator-instance to escape prefix throttling.
  • JobManager becomes the bottleneck. Decompose into multiple jobs by feature group. "Card features," "Merchant features," "User features" as separate jobs sharing the same Kafka source.

Inflection point: at ~5M events/sec, single-job Flink hits JobManager and S3 coordination ceilings. Above that, split by domain.

Type 2: hotspot intensification (state skew)

Same keys, but one key gets disproportionate traffic. Hash-partitioning sends all events for that key to one slot. That slot is pegged at 100% CPU and holds 50× more state than peers. Backpressure propagates. Total throughput collapses.

This needs a different fix than Type 1. Adding more TMs does not help; the hot key still maps to one of them.

Fixes in increasing order of complexity:

  1. Pre-aggregate at the source. Combine multiple events for the same key before they cross the network. Reduces shuffle traffic for hot keys by 10–100×.
  2. Two-stage aggregation (salting). Split the hot key into N salt buckets at stage 1, re-key without salt at stage 2.
  3. Hot-key detection + dedicated slot. Profile the keyspace, identify the top-K hot keys, route them to a dedicated TM with extra resources. Custom partitioner.
  4. Approximate algorithms. If exact counts on the hot key are not needed, use HyperLogLog for distinct-count, Count-Min Sketch for frequency. Per-slot state grows by O(precision), not O(unique values).

Inflection point: hot key crosses 50k events/sec/slot. Below that, ignore. Above, salt or pre-aggregate.


§24. Decision matrix — when stream processing vs alternatives

When should you reach for a stream processor vs adjacent technologies?

Criterion Stream processor Batch ETL (Spark, dbt) OLAP on raw data (ClickHouse, Pinot, Druid) Complex Event Processing (Esper, Flink CEP)
Freshness Seconds (or sub-second) Minutes to hours (next batch) Seconds (query latency) on indexed data Same as stream processor (it IS one)
Per-query cost O(1) per event O(N) per batch O(rows-scanned) per query O(1) per event
Aggregation depth Incremental, limited window Full-history complex aggregations easy Ad-hoc queries; limited stateful pattern matching Pattern matching (X then Y within window Z)
State maintenance Yes, durable across crashes No (output is the state) Storage is the state; queries are stateless Yes
Setup cost High (cluster + state tuning) Low (just SQL + scheduler) Medium (specialized cluster) High (stream processor + CEP library)
Use when Continuous derived data, sub-minute freshness, large state Periodic refresh OK, complex joins on history Interactive analytics on time-series-ish data Sequential pattern detection (login then large withdrawal within 5 min)
Avoid when Mostly batch, no freshness need; ad-hoc analyst queries Need sub-minute freshness; need exactly-once internal state Need to maintain derived state, not just answer queries Simple aggregations (overkill)

Specific thresholds:

  • If freshness requirement > 5 minutes, batch ETL is simpler and cheaper.
  • If freshness requirement < 1 second and state per key is non-trivial, only stream processing fits.
  • If queries are ad-hoc against raw event history with no derived state, OLAP wins.
  • If you need "detect sequence A → B → C within 30s," use a CEP library on top of a stream processor.

The boundary case: Materialize and RisingWave blur the line between stream processor and OLAP. They maintain incremental views (stream processing) but expose Postgres-protocol query interfaces (OLAP-like access). For a SQL-shop building real-time dashboards, they replace "Flink + serving store + dashboard."


How the same class of technology powers radically different products:

Fraud detection — Stripe Radar

Demands: sub-100ms decision SLO on every card auth. Tens of features per card, computed continuously. End-to-end exactly-once (regulators audit). State proportional to active cards (~100s of millions). Out-of-order events from edge auth gateways across regions.

Variant fit: Flink-style true streaming with event-time windows, RocksDB state, 2PC Kafka sink. Sub-second latency excludes micro-batch. End-to-end exactly-once excludes "at-least-once + dedupe downstream." Custom feature logic (Java/Scala) excludes pure SQL streaming.

Real-time ML features — Uber Michelangelo, LinkedIn

Demands: hundreds of features across hundreds of millions of entities (riders, drivers, members, restaurants). Updates within seconds. Backfill compatibility (a feature must be re-computable from historical events for model training). Feature parity between online and offline computation.

Variant fit: Flink or Samza for the online path; the same logic compiled to Beam or Spark for the offline backfill (or unified via Beam from the start). State per feature in RocksDB. Output to a feature store (Feast, Tecton, internal). Heavy emphasis on schema evolution because models change frequently.

Real-time analytics — Pinterest, Snapchat, Twitter

Demands: 1M+ events/sec into dashboards showing user engagement, ad performance, content trends. Sub-minute freshness. Many overlapping queries; analysts authoring new ones daily.

Variant fit: Flink SQL for the continuous aggregation; results land in Pinot, Druid, or ClickHouse for ad-hoc OLAP queries. Increasingly, Materialize or RisingWave replace the Flink + OLAP split when the queries are stable enough to express as materialized views.

Sessionization — Snowplow, web analytics platforms

Demands: group events into user sessions, closing after configurable idle gaps. Per-session feature computation (pages visited, conversion path, total time). Tolerate late-arriving events (mobile devices go offline).

Variant fit: Spark Structured Streaming or Flink with session windows. State size is the dominant concern; aggressive TTL and bounded max session duration are mandatory.

IoT alerting — Tesla fleet, Cloudflare DDoS detection

Demands: millions of devices each emitting telemetry. Alert when anomalies exceed thresholds. Per-device state (rolling baselines, recent samples). Tolerate clock skew across devices.

Variant fit: Flink with event-time windows + watermark idleness handling for offline devices. Statefun (Flink's stateful functions) or Beam for the per-device state machine. Output to PagerDuty / alerting via idempotent REST writes.

Real-time leaderboards — gaming platforms

Demands: top-K scoring per game, per region, per time window. Updates within seconds of a score being submitted. Hot keys (one viral game dominates traffic).

Variant fit: Flink with two-stage aggregation for hot keys; output to Redis for serving the leaderboard reads. Or Materialize maintaining an incremental "top-100 per game" view, queried via Postgres protocol directly.

Demands: ingest a database's change log (inserts, updates, deletes from Postgres / MySQL via the binlog), stream-process the row changes, materialize derived datasets in a separate store. The "outbox pattern" feeding event-driven services.

Variant fit: Debezium reads the database log, publishes to Kafka. Flink consumes the Kafka CDC topic, joins it with reference data, writes to OLAP / OLTP / search index. Variants: Flink SQL with UPSERT-INTO semantics; Materialize natively consumes Debezium and maintains derived views.

Observability pipelines — Netflix Mantis, Honeycomb, internal at large companies

Demands: 10M+ events/sec of operational telemetry (log lines, traces, metric points). Real-time anomaly detection. Cheap storage of the long tail; expensive but fresh derived metrics.

Variant fit: Flink/Mantis for the live derivation path. Output to Druid/Pinot for ad-hoc querying. The streaming layer is what converts log lines into the metrics you actually graph.

The point: it is the same Chandy–Lamport machinery, the same LSM state backend, the same 2PC sink protocol underneath. The product context determines window choice, state shape, and sink class — but the engine class is one.


§26. Real-world implementations with numbers

  • Apache Flink at Uber (Michelangelo + AthenaX): ~10M events/sec across ~200 jobs. ~5,000 TaskManagers. Powers real-time fare estimation, ETA, fraud detection. State in RocksDB; checkpoints to HDFS. Uber publicly describes operating Flink at this scale with sub-second latencies for feature freshness.
  • Apache Flink at Alibaba (with the Blink fork → reintegrated upstream): Singles' Day peak of 583k payments/sec, with Flink jobs aggregating order-stream analytics in real time. Aggregate platform volume is trillions of events per day. Alibaba's optimizations (incremental checkpointing improvements, SQL planner enhancements) were donated back to Apache Flink.
  • Apache Flink at Stripe Radar: real-time evaluation of card-auth events with sub-100ms decision SLOs. Mix of Kafka Streams and custom infrastructure plus Flink for the heavier stateful pipelines.
  • Apache Flink at Pinterest: ~1M events/sec into Flink jobs for real-time recommendations and spam detection. Joined with stream-table lookups against Redis/HBase profile stores.
  • Apache Samza at LinkedIn: originally invented at LinkedIn (Kreps et al.) for stream processing on Kafka. Today LinkedIn runs both Samza and Flink for derived datasets, notifications, feed personalization, anti-abuse, Member Hot Lead Scoring. Trillions of events per day cumulative.
  • Netflix Mantis: streaming platform for operational telemetry. ~10s of millions of events/sec. Real-time anomaly detection across the streaming infrastructure. Uses a Flink-like compute model with Netflix-specific scheduling.
  • Apache Spark Structured Streaming at Databricks customers: the most common micro-batch streaming deployment. Used heavily for ETL-style pipelines where 1-second–1-minute latency is acceptable. Databricks publishes case studies with multi-million-events/sec deployments (e.g., HSBC, Comcast).
  • Kafka Streams at Confluent customers: lighter-weight deployments, typically ~100k events/sec per service instance, embedded in microservices. Common for stateful stream consumers inside service-oriented architectures.
  • Materialize at fintechs / SaaS: incremental SQL views over Kafka, queried via Postgres protocol. Cited deployments at firms like Ramp and several fintechs for real-time risk views and live dashboards. Throughput typically in the 100k–1M events/sec range per cluster.
  • RisingWave: PostgreSQL-protocol streaming database with cloud-native object storage (Hummock storage engine) for state. Newer; growing fast for real-time analytics replacing Flink + OLAP combos. Public benchmarks claim ~1M events/sec per cluster with sub-second latencies.
  • Apache Beam: the portability layer. Heavily used inside Google for Dataflow (the original streaming engine MillWheel was a precursor); externally used to make pipelines portable across Flink, Spark, Dataflow, and Samza runners.

Different scales, different access patterns, different variants of the same technology class. If a candidate can name three of these with rough scale and the engine variant used, they're already in the top decile.


§27. Summary

Stream processing is the always-on incremental compute layer over event logs: a continuous query graph maintaining keyed state in an LSM-backed local store, with global consistency provided by Chandy–Lamport barriers (Flink) or transactional offset+state+sink commits (Kafka Streams) or deterministic batch IDs (Spark), and end-to-end exactly-once coordinated via 2PC sinks bound to the checkpoint protocol — pick true-streaming (Flink) for sub-second latency and complex windowing, micro-batch (Spark) when you already run Spark and a second of latency is fine, embedded (Kafka Streams) for stateful consumers inside a microservice, and streaming SQL databases (Materialize, RisingWave) when the consumers are analysts; size by event throughput per slot (~30k events/sec/slot for stateful), bound replay by checkpoint interval, defend against hot keys with salting, and remember that the durability point is "checkpoint metadata persisted plus sink-transaction committed" — without both, you have at-least-once at best.