Skip to content
hibranwar
  • About
  • Work
  • Writing
  • Library
  • Made
  • Now
  • Contact

Module 5: Distributed Systems & Consensus

Systems Thinking × System Design · 8 sessions

← Back to course

The Trade-Off You Cannot Escape

In 2000, Eric Brewer stood before an audience at the ACM Symposium on Principles of Distributed Computing and made a conjecture: a distributed data store cannot simultaneously provide more than two of three guarantees. Those three guarantees are Consistency, Availability, and Partition Tolerance. Two years later, Seth Gilbert and Nancy Lynch published a formal proof confirming it. The conjecture became a theorem.

This result is not a design recommendation. It is a constraint of physics and logic. When you build a system that spans multiple networked nodes, the CAP theorem tells you what combinations are possible. Understanding it prevents you from chasing impossible architectures.

CAP doesn't ask what you want. It asks what you're willing to lose.

The Three Guarantees

Consistency (C) means every read receives the most recent write or an error. All nodes see the same data at the same time. If you write a value to one node, any subsequent read from any node returns that value. This is linearizability, the strongest form of consistency.

Availability (A) means every request receives a non-error response, without the guarantee that it contains the most recent write. The system always responds, even if the data might be stale.

Partition Tolerance (P) means the system continues to operate despite arbitrary message loss or delay between nodes. Network partitions are not theoretical. They happen in every distributed system, whether from cable cuts, switch failures, or cloud provider issues.

graph TD CAP["CAP Theorem
Pick two during a partition"] C["Consistency
Every read gets latest write"] A["Availability
Every request gets a response"] P["Partition Tolerance
Works despite network splits"] CAP --- C CAP --- A CAP --- P CP["CP Systems
ZooKeeper, HBase,
etcd, Redis Cluster"] AP["AP Systems
Cassandra, DynamoDB,
CouchDB, Riak"] CA["CA Systems
Single-node PostgreSQL,
Single-node MySQL
(no partition = no choice)"] C --- CP C --- CA A --- AP A --- CA P --- CP P --- AP

Why Partition Tolerance Is Non-Negotiable

In any system distributed across a network, partitions will happen. You do not get to choose whether partitions occur. You only get to choose how your system behaves when they do. This means the real choice during a partition is between consistency and availability.

A CP system chooses consistency over availability. When a partition occurs, some nodes may refuse to serve requests rather than return stale data. ZooKeeper does this: if a node cannot reach a quorum, it rejects client requests. The data you get is always correct, but sometimes you get no data at all.

An AP system chooses availability over consistency. When a partition occurs, every node continues serving requests, even if different nodes have different versions of the data. Cassandra does this: writes succeed on whichever nodes are reachable, and the system reconciles conflicts after the partition heals.

A CA system provides both consistency and availability, but only because it does not tolerate partitions. A single-node PostgreSQL instance is CA. The moment you add replication across a network, you must confront partitions, and CA is no longer an option.

Classifying Real Databases

Database CAP Classification Partition Behavior Typical Use Case
ZooKeeper CP Refuses reads/writes without quorum Configuration management, leader election
etcd CP Raft-based quorum required Kubernetes cluster state
HBase CP Region server unavailable if master unreachable Large-scale analytical storage
Redis Cluster CP Minority partition stops accepting writes Caching, session store
MongoDB (default) CP Primary election required for writes Document storage, general purpose
Cassandra AP All nodes accept reads/writes independently Time-series, IoT, high write throughput
DynamoDB AP Eventually consistent reads by default Serverless applications, key-value store
CouchDB AP Multi-master replication with conflict resolution Offline-first mobile apps
Riak AP Sloppy quorum, hinted handoff High-availability key-value store
PostgreSQL (single node) CA No partition tolerance (single node) Relational workloads, OLTP

The PACELC Extension

Daniel Abadi proposed PACELC in 2010 because CAP only describes behavior during partitions. Most of the time, the network is healthy. PACELC asks: even when there is no partition, does the system trade latency for consistency?

The framework reads as: if there is a Partition, choose Availability or Consistency. Else, when operating normally, choose Latency or Consistency.

flowchart LR Start["Network Status"] -->|Partition| P["During Partition"] Start -->|Normal| E["Else (No Partition)"] P -->|Choose| PA["Availability"] P -->|Choose| PC["Consistency"] E -->|Choose| EL["Low Latency"] E -->|Choose| EC["Consistency"] PA -.->|Example| Cass1["Cassandra: PA/EL"] PC -.->|Example| HB["HBase: PC/EC"] EL -.->|Example| Cass2["DynamoDB: PA/EL"] EC -.->|Example| MG["MongoDB: PC/EC"]

Cassandra is PA/EL: during a partition, it chooses availability. During normal operation, it chooses low latency over strong consistency. MongoDB is PC/EC: it chooses consistency in both cases, accepting higher latency for correctness.

Comparing Three Databases

The radar chart below compares PostgreSQL (single-node, CA), Cassandra (AP), and MongoDB (CP) across five dimensions. Scores are relative, on a 1-to-5 scale, and reflect typical default configurations.

Common Misconceptions

CAP does not mean "pick any two." It means that during a network partition, you must choose between consistency and availability. When there is no partition, you can have both. The choice is forced only when things go wrong.

CAP consistency is not the same as ACID consistency. CAP consistency refers to linearizability across distributed nodes. ACID consistency refers to database constraints and invariants being preserved after a transaction. They are related concepts with different scopes.

Most systems are not purely CP or AP. Many databases offer tunable consistency. DynamoDB lets you request strongly consistent reads at the cost of higher latency. Cassandra lets you set consistency levels per query (ONE, QUORUM, ALL). The CAP classification describes the default or primary behavior, not the only mode.

Further Reading

  • Gilbert, S. & Lynch, N. (2002). "Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services."
  • Brewer, E. (2012). "CAP Twelve Years Later: How the 'Rules' Have Changed."
  • Wikipedia: PACELC Theorem
  • Abadi, D. (2012). "Consistency Tradeoffs in Modern Distributed Database System Design."
  • Wikipedia: CAP Theorem

Assignment

You are building a chat application with servers in Jakarta and Surabaya. The network link between the two cities goes down. You have two choices:

  1. Option A (CP): Reject all writes on both servers until the partition heals, ensuring both cities always see the same message history.
  2. Option B (AP): Allow both servers to continue accepting messages independently, then merge the two message histories when the network recovers.

For each option, answer:

  • What is the user experience during the outage?
  • What happens when the partition heals?
  • What data integrity risks exist?
  • Which option would you choose for a chat app, and why?

Not All Consistency Is Equal

Session 5.1 introduced the CAP theorem's definition of consistency: linearizability, where every read returns the most recent write. But linearizability is just one point on a spectrum. In practice, distributed systems offer a range of consistency models, each trading correctness for performance in different ways.

The model you choose determines what users see, when they see it, and whether the application feels broken or smooth. A banking system and a social media feed have very different consistency requirements. Picking the wrong model for your use case either wastes resources or frustrates users.

Eventual consistency doesn't mean "wrong." It means "not yet."

The Five Models

Strong Consistency (Linearizability)

Every read returns the value of the most recent completed write, globally. All nodes agree on the order of operations. This is what single-node databases provide naturally. In a distributed system, achieving it requires coordination between nodes on every operation, which increases latency.

Google Spanner achieves strong consistency across global data centers using synchronized atomic clocks (TrueTime). The engineering cost is extraordinary. Most systems do not need this, and most teams cannot afford it.

Eventual Consistency

If no new writes occur, all replicas will eventually converge to the same value. There is no bound on how long "eventually" takes, though in practice it is usually milliseconds to seconds. During the convergence window, different clients may read different values from different replicas.

DNS is the classic example. When you update a DNS record, it propagates across nameservers over minutes to hours. Every server eventually gets the new value, but during propagation, some clients see old data and others see new data.

Read-Your-Writes Consistency

A client always sees its own writes. After writing a value, that same client is guaranteed to read that value (or a newer one) on subsequent reads. Other clients may still see stale data. This model preserves individual user experience without requiring global coordination.

Monotonic Reads

If a client reads a value at time T, subsequent reads from that client will never return a value older than what was seen at time T. Time does not go backward for any individual reader. Without this guarantee, a user could refresh a page and see older data than what they saw a moment ago.

Causal Consistency

Operations that are causally related are seen by all nodes in the same order. If operation A causes operation B, every node sees A before B. Concurrent operations (those with no causal relationship) may be seen in different orders on different nodes.

Consider a social network where Alice posts a message and Bob replies. Causal consistency guarantees that every user who sees Bob's reply also sees Alice's original post. Without it, some users might see a reply to a message that appears not to exist.

Comparison Table

Model Guarantee UX Impact Latency Cost Example System
Strong (Linearizable) Every read returns latest write, globally Perfect correctness. No stale reads. High. Requires quorum or coordination. Google Spanner, ZooKeeper
Eventual Replicas converge if writes stop Stale reads possible. May confuse users. Low. Reads from nearest replica. DNS, Cassandra (CL=ONE), DynamoDB (default)
Read-Your-Writes Client sees its own writes User's own actions always reflected immediately. Medium. Requires routing or session pinning. Facebook TAO, many web frameworks
Monotonic Reads No backward time travel for a client Data may be stale, but never goes backward. Low to medium. Requires version tracking. DynamoDB (consistent reads), Riak
Causal Causally related ops seen in order Related events appear in correct sequence. Medium. Requires dependency tracking. MongoDB (causal sessions), COPS

Read-Your-Writes in Action

The sequence diagram below shows how read-your-writes consistency works in a system with a primary node and a replica. The user writes to the primary, then reads. Without the guarantee, the read could hit a replica that has not yet received the write. With it, the system ensures the read either goes to the primary or to a replica that has caught up.

sequenceDiagram participant U as User participant P as Primary Node participant R as Replica Node U->>P: Write: profile_photo = "new.jpg" P-->>U: Write acknowledged (timestamp T1) P->>R: Replicate write (async) Note over R: Replication in progress... U->>R: Read: profile_photo (without RYW) R-->>U: Returns "old.jpg" (stale!) Note over U: With Read-Your-Writes: U->>P: Read: profile_photo (routed to primary, or replica with T >= T1) P-->>U: Returns "new.jpg" (correct)

Implementation strategies for read-your-writes include:

  • Session pinning: Route all reads from a client to the same node that handled the write.
  • Write timestamp tracking: Attach a timestamp to the write acknowledgment. On reads, only accept responses from replicas at or past that timestamp.
  • Read from primary: After a write, read from the primary node for a short window (e.g., 5 seconds), then fall back to replicas.

The Consistency Spectrum

These models are not isolated categories. They form a spectrum from strongest to weakest guarantees.

graph LR S["Strong
Linearizable
Highest latency"] --> C["Causal
Ordered causality
Medium latency"] C --> RYW["Read-Your-Writes
Self-consistency
Medium latency"] RYW --> MR["Monotonic Reads
No backward time
Low latency"] MR --> E["Eventual
Convergence only
Lowest latency"] style S fill:#222221,stroke:#c8a882,color:#ede9e3 style C fill:#222221,stroke:#c8a882,color:#ede9e3 style RYW fill:#222221,stroke:#6b8f71,color:#ede9e3 style MR fill:#222221,stroke:#6b8f71,color:#ede9e3 style E fill:#222221,stroke:#8a8478,color:#ede9e3

Stronger models are more expensive. They require more network round trips, more coordination between nodes, and more waiting. Weaker models are cheaper but push complexity to the application layer. Your code must handle stale data, conflicts, and out-of-order events.

Choosing the Right Model

The question is not which model is best. The question is which model fits your use case.

Bank transfers: Strong consistency. A customer's balance must be correct at all times. Showing stale data leads to overdrafts, double-spending, or regulatory violations.

Social media likes counter: Eventual consistency. If a post shows 4,291 likes instead of 4,293 for a few seconds, nobody notices and nobody is harmed.

User profile edits: Read-your-writes. The user who changed their profile photo should see the new photo immediately. Other users can wait a few seconds.

Comment threads: Causal consistency. A reply must appear after the comment it responds to. Showing replies before their parent comments is confusing.

Analytics dashboards: Eventual consistency. Data arriving a few seconds late is perfectly acceptable for aggregate metrics.

Further Reading

  • Jepsen: Consistency Models — Interactive map of consistency models and their relationships.
  • Wikipedia: Consistency Model
  • Vogels, W. (2008). "Eventually Consistent." — Amazon CTO's explanation of consistency trade-offs.
  • Lloyd, W. et al. "Don't Settle for Eventual: Scalable Causal Consistency for Wide-Area Storage with COPS."

Assignment

A user uploads a new profile photo. Immediately after uploading, they navigate to their profile page to verify it. Under eventual consistency, the profile page loads from a replica that has not yet received the write. The user sees their old photo.

  1. Which consistency model fixes this problem?
  2. Describe two different implementation strategies to achieve it.
  3. What are the latency trade-offs of each strategy?
  4. Would you apply the same model to the user's followers viewing the profile? Why or why not?

Assumptions That Will Betray You

In 1994, Peter Deutsch compiled a list of assumptions that developers new to distributed systems commonly make. Sun Microsystems later added an eighth. These are known as the Eight Fallacies of Distributed Computing. Every one of them is wrong, and every one of them causes real production failures when believed.

This session covers the fallacies, then goes deeper into three foundational problems of distributed systems: the Two Generals Problem, Byzantine fault tolerance, and logical clocks.

The network is not reliable, latency is not zero, bandwidth is not infinite. Every distributed system that forgets this will be reminded.

The Eight Fallacies

# Fallacy Reality Real-World Consequence
1 The network is reliable Packets get dropped, connections reset, hardware fails AWS us-east-1 outage (2017): S3 metadata service lost connectivity, cascading across dozens of services
2 Latency is zero Cross-region calls add 50-300ms; cross-continent 100-500ms Microservice chains with 10+ synchronous hops accumulate seconds of latency
3 Bandwidth is infinite Network links saturate under load; mobile connections throttle Video streaming services buffer when CDN capacity spikes during live events
4 The network is secure Traffic can be intercepted, modified, or replayed Man-in-the-middle attacks on unencrypted internal service communication
5 Topology doesn't change Nodes join and leave; routes shift; DNS entries update Auto-scaling adds new instances that the service registry has not yet discovered
6 There is one administrator Multiple teams, vendors, and cloud providers manage different parts Conflicting firewall rules between teams block legitimate traffic in production
7 Transport cost is zero Serialization, encryption, compression, and network I/O all consume CPU and time gRPC protobuf serialization overhead becomes measurable at millions of requests per second
8 The network is homogeneous Different hardware, protocols, OS versions, and configurations coexist IPv4/IPv6 mismatch causes connectivity failures between old and new services

The Two Generals Problem

The Two Generals Problem, first described by Jim Gray in 1978 and formalized by Akkoyunlu, Ekanadham, and Huber in 1975, is the simplest impossibility result in distributed computing.

Two armies, commanded by separate generals, must agree on a time to attack a city. They can only communicate by sending messengers through enemy territory. Any messenger might be captured. The question: can the generals reach agreement?

sequenceDiagram participant G1 as General 1 participant E as Enemy Territory participant G2 as General 2 G1->>E: Messenger 1: "Attack at dawn" Note over E: Messenger might be captured! E->>G2: Messenger 1 arrives G2->>E: Messenger 2: "Agreed, attack at dawn" Note over E: This messenger might also be captured! E->>G1: Messenger 2 arrives Note over G1: But does G2 know I received the confirmation? G1->>E: Messenger 3: "I confirm your confirmation" Note over E: And so on, infinitely...

The answer is no. No finite number of messages can guarantee agreement when any message might be lost. General 1 sends "attack at dawn." Even if General 2 receives it and sends back a confirmation, General 1 does not know whether the confirmation arrived. General 2 knows this, so General 2 cannot be sure General 1 will attack. The problem recurses infinitely.

This is not a contrived academic puzzle. It is the fundamental reason why distributed systems use timeouts, retries, and idempotency. TCP's three-way handshake does not solve the Two Generals Problem. It works well enough for practical purposes, but it does not provide absolute certainty of agreement.

Byzantine Fault Tolerance

The Two Generals Problem assumes honest participants with an unreliable channel. The Byzantine Generals Problem, described by Lamport, Shostak, and Pease in 1982, is harder: some participants may be actively malicious. They may send conflicting information to different nodes, or they may deliberately lie.

A system is Byzantine fault tolerant (BFT) if it can reach correct consensus even when some nodes are faulty or malicious. The classic result is that BFT requires at least 3f + 1 total nodes to tolerate f Byzantine failures. With 3 faulty nodes, you need at least 10 total nodes.

Most internal distributed systems (databases, message queues, coordination services) do not implement BFT because they trust their own nodes. Blockchain systems do, because participants are anonymous and untrusted. Bitcoin's proof-of-work and Ethereum's proof-of-stake are both solutions to the Byzantine Generals Problem for open networks.

Logical Clocks

In a single computer, events have a clear order because they share one clock. In a distributed system, each node has its own clock. Physical clocks drift. Network Time Protocol (NTP) synchronization has millisecond-level accuracy at best. You cannot rely on wall-clock time to order events across nodes.

Leslie Lamport solved this in 1978 with logical clocks (Lamport timestamps). Each process maintains a counter. On every local event, increment the counter. When sending a message, attach the counter. When receiving a message, set your counter to max(local, received) + 1.

sequenceDiagram participant A as Process A participant B as Process B participant C as Process C Note over A: Event a1 (LC=1) A->>B: Message (LC=1) Note over B: Event b1 (LC=2) Note over B: max(0,1)+1 = 2 B->>C: Message (LC=2) Note over C: Event c1 (LC=3) Note over C: max(0,2)+1 = 3 Note over A: Event a2 (LC=2) C->>A: Message (LC=3) Note over A: Event a3 (LC=4) Note over A: max(2,3)+1 = 4

Lamport clocks establish a happens-before relationship. If event A has a lower timestamp than event B, it is possible that A happened before B. But the converse is not guaranteed: two events with different timestamps might be concurrent (unrelated).

Vector Clocks

Vector clocks extend Lamport clocks to detect concurrency. Instead of a single counter, each process maintains a vector of counters, one per process. When process i has an event, it increments its own entry. When sending, it attaches the full vector. When receiving, it takes the element-wise maximum and increments its own entry.

If vector A is less than or equal to vector B in every position, then A happened before B. If neither is less than or equal to the other, the events are concurrent. This distinction is critical for conflict detection in systems like Amazon's Dynamo.

sequenceDiagram participant A as Process A participant B as Process B Note over A: Event a1
VC=[1,0] A->>B: Message VC=[1,0] Note over B: Event b1
VC=max([0,0],[1,0])+[0,1]
=[1,1] Note over A: Event a2
VC=[2,0] Note over B: Event b2
VC=[1,2] Note over A,B: a2=[2,0] vs b2=[1,2]
Neither dominates: CONCURRENT

Connecting the Concepts

These problems are not independent. The fallacies explain why distributed computing is hard. The Two Generals Problem proves that perfect agreement over unreliable channels is impossible. Byzantine faults show that even agreement itself is insufficient when participants lie. And logical clocks provide the ordering mechanism that physical clocks cannot.

Every distributed system you build or operate sits on top of these constraints. Consensus algorithms (Session 5.4), replication protocols (Session 3.4), and consistency models (Session 5.2) are all engineering responses to these fundamental limitations.

Further Reading

  • Wikipedia: Fallacies of Distributed Computing
  • Lamport, L. (1978). "Time, Clocks, and the Ordering of Events in a Distributed System."
  • Lamport, L., Shostak, R., & Pease, M. (1982). "The Byzantine Generals Problem."
  • Wikipedia: Two Generals' Problem
  • Wikipedia: Vector Clock

Assignment

List the Eight Fallacies of Distributed Computing. For each fallacy, give a specific, real example of a system or incident where assuming the fallacy caused a failure. Your examples should be concrete: name the system, the year if possible, and what went wrong.

Bonus: For fallacies 1, 2, and 3, describe the engineering pattern commonly used to mitigate each (e.g., retries with backoff, circuit breakers, compression, CDNs).

Getting Distributed Nodes to Agree

Session 5.3 established that distributed systems face unreliable networks and imperfect clocks. Despite these constraints, real systems need nodes to agree on things: which node is the leader, what order operations happened in, and whether a transaction committed. Consensus algorithms are the formal mechanisms for reaching such agreement.

Two algorithms dominate the field: Paxos, published by Leslie Lamport in 1989, and Raft, published by Diego Ongaro and John Ousterhout in 2014. They solve the same problem with different philosophies.

Raft was designed to be understood. Paxos was designed to be correct. In practice, understandability wins.

What Is Consensus?

A consensus algorithm allows a group of nodes to agree on a single value, even if some nodes fail. The requirements are:

  • Agreement: All non-faulty nodes decide on the same value.
  • Validity: The decided value was proposed by some node.
  • Termination: All non-faulty nodes eventually decide.
  • Integrity: Each node decides at most once.

In practice, consensus is used for leader election, log replication, distributed locks, and configuration management. Every time your Kubernetes cluster elects a controller, every time etcd agrees on a key-value write, a consensus algorithm is running.

Paxos

Lamport described Paxos using an allegory about a parliament on the Greek island of Paxos. The algorithm has three roles: Proposers suggest values, Acceptors vote on proposals, and Learners learn the decided value. A single node can play multiple roles.

Basic Paxos runs in two phases:

Phase 1 (Prepare): A proposer picks a proposal number n and sends a Prepare(n) message to a majority of acceptors. Each acceptor responds with a promise not to accept any proposal numbered less than n, along with any value it has already accepted.

Phase 2 (Accept): If the proposer receives promises from a majority, it sends an Accept(n, v) message, where v is either the value from the highest-numbered previously accepted proposal, or the proposer's own value if no prior proposals exist. If a majority of acceptors accept, the value is chosen.

sequenceDiagram participant P as Proposer participant A1 as Acceptor 1 participant A2 as Acceptor 2 participant A3 as Acceptor 3 Note over P: Phase 1: Prepare P->>A1: Prepare(n=1) P->>A2: Prepare(n=1) P->>A3: Prepare(n=1) A1-->>P: Promise(n=1, no prior value) A2-->>P: Promise(n=1, no prior value) A3-->>P: Promise(n=1, no prior value) Note over P: Phase 2: Accept P->>A1: Accept(n=1, v="commit X") P->>A2: Accept(n=1, v="commit X") P->>A3: Accept(n=1, v="commit X") A1-->>P: Accepted A2-->>P: Accepted A3-->>P: Accepted Note over P: Value "commit X" is chosen

Paxos guarantees safety: it will never decide on two different values. But it has known liveness issues. Two proposers can repeatedly preempt each other with higher proposal numbers, creating an infinite loop where no value is ever chosen. Multi-Paxos solves this by electing a stable leader, but the optimization adds complexity.

The deeper issue is comprehensibility. Lamport's original paper was notoriously difficult to understand. Multiple follow-up papers attempted to clarify it. Implementations diverge from the specification because engineers cannot verify their code against a formalism they do not fully grasp.

Raft

Ongaro and Ousterhout designed Raft explicitly for understandability. Their 2014 paper opens with the observation that Paxos has become synonymous with consensus, yet few people understand it well enough to implement it correctly. Raft decomposes consensus into three sub-problems: leader election, log replication, and safety.

Leader Election

Every node is in one of three states: Follower, Candidate, or Leader. All nodes start as followers. If a follower does not hear from a leader within a randomized election timeout, it becomes a candidate, increments its term number, votes for itself, and requests votes from other nodes.

stateDiagram-v2 [*] --> Follower Follower --> Candidate: Election timeout expires Candidate --> Candidate: Election timeout (split vote) Candidate --> Leader: Receives majority votes Candidate --> Follower: Discovers higher term Leader --> Follower: Discovers higher term

A candidate wins the election if it receives votes from a majority of nodes. If two candidates split the vote (no majority), both time out and try again with new term numbers. The randomized timeout makes split votes unlikely to repeat.

Log Replication

Once elected, the leader accepts client requests and appends them to its log. It then sends AppendEntries RPCs to all followers. When a majority of followers have written the entry to their logs, the entry is committed. The leader notifies the client that the write succeeded.

sequenceDiagram participant C as Client participant L as Leader participant F1 as Follower 1 participant F2 as Follower 2 C->>L: Write request L->>L: Append to local log L->>F1: AppendEntries(entry) L->>F2: AppendEntries(entry) F1-->>L: Ack F2-->>L: Ack Note over L: Majority confirmed (3/3) L->>L: Commit entry L-->>C: Write successful L->>F1: Commit notification L->>F2: Commit notification

Safety

Raft guarantees that if a log entry is committed, it will be present in the logs of all future leaders. A candidate cannot win an election unless its log is at least as up-to-date as a majority of nodes. This prevents a node with a stale log from becoming leader and overwriting committed entries.

Paxos vs. Raft

Dimension Paxos Raft
Published 1989 (Lamport) 2014 (Ongaro & Ousterhout)
Design goal Correctness proof Understandability
Leader required? No (basic); yes (Multi-Paxos) Yes, always
Roles Proposer, Acceptor, Learner Leader, Follower, Candidate
Log ordering Gaps allowed, filled later Strictly sequential, no gaps
Failure handling Complex reconfiguration Leader dies, new election, simple recovery
Implementations Google Chubby, Apache Mesos etcd, Consul, CockroachDB, TiKV
Learning curve Steep. Multiple papers needed. Moderate. One paper suffices.
Performance Comparable in practice Comparable in practice
Correctness proof Original paper provides proof TLA+ specification by Ongaro

ZooKeeper: Consensus as a Service

Apache ZooKeeper provides consensus as a reusable service. Instead of embedding Paxos or Raft into every application, you delegate coordination to ZooKeeper. It uses a protocol called ZAB (ZooKeeper Atomic Broadcast), which is similar to Raft in that it uses a leader for ordering.

ZooKeeper provides distributed primitives: locks, barriers, leader election, configuration management, and group membership. Kafka (before KRaft) used ZooKeeper for broker coordination and partition leader election. Hadoop uses it for NameNode high availability. HBase uses it for region server coordination.

The trend in recent years is away from external coordination services and toward embedded consensus. Kafka replaced ZooKeeper with its own Raft-based protocol (KRaft). CockroachDB and TiKV embed Raft directly. The operational overhead of managing a ZooKeeper cluster is significant, and teams prefer fewer moving parts.

Quorum Math

Consensus algorithms depend on quorums: a majority of nodes that must agree for an operation to proceed. For a cluster of N nodes, the quorum size is floor(N/2) + 1. The cluster can tolerate floor((N-1)/2) failures.

Cluster Size (N) Quorum Size Max Failures Tolerated
3 2 1
5 3 2
7 4 3
9 5 4

Five nodes is the most common production configuration. It tolerates two failures, which covers most realistic scenarios (one planned maintenance + one unexpected failure). Seven or nine nodes increase fault tolerance but add latency, because every write must wait for more acknowledgments.

Further Reading

  • Ongaro, D. & Ousterhout, J. (2014). "In Search of an Understandable Consensus Algorithm."
  • Lamport, L. (2001). "Paxos Made Simple."
  • Raft Visualization — Interactive animation of Raft leader election and log replication.
  • Wikipedia: Paxos
  • Apache ZooKeeper Internals

Assignment

You have a 5-node Raft cluster. The current leader (Node 1) crashes.

  1. Describe the step-by-step process of a new leader being elected. Include term numbers, vote requests, and the role of election timeouts.
  2. What is the minimum number of nodes that must agree for a new leader to be elected?
  3. What happens if Node 2 and Node 3 both become candidates in the same term? Walk through the split-vote scenario and its resolution.
  4. After the new leader is elected, it discovers that Node 4 has a log entry that was never committed. What does Raft do with this entry?

Two Patterns for Asynchronous Communication

When services need to communicate without waiting for each other, they pass messages. But "pass messages" hides a critical design choice: should the message go to one recipient, or to many? Message queues and publish/subscribe (pub/sub) answer this differently, and choosing wrong creates architectural debt that compounds over time.

A queue says "someone handle this." Pub/sub says "everyone who cares, here is what happened." The first is a work assignment. The second is a broadcast.

Message Queues: Point-to-Point

A message queue holds messages until a consumer picks them up. Each message is delivered to exactly one consumer. Once consumed, the message is removed from the queue (or marked as processed). If multiple consumers are listening, they compete for messages. This is called the competing consumers pattern.

This model is ideal for work distribution. You have 10,000 image resize jobs. Five workers pull from the same queue. Each image is resized exactly once. No duplication, no missed work. If a worker crashes mid-processing, the message becomes visible again after a timeout (visibility timeout in SQS, negative acknowledgment in RabbitMQ) and another worker picks it up.

graph LR P[Producer] -->|sends message| Q[Queue] Q -->|delivers to ONE| C1[Consumer A] Q -.->|or| C2[Consumer B] Q -.->|or| C3[Consumer C] style Q fill:#222221,stroke:#c8a882,color:#ede9e3 style P fill:#191918,stroke:#6b8f71,color:#ede9e3 style C1 fill:#191918,stroke:#c8a882,color:#ede9e3 style C2 fill:#191918,stroke:#8a8478,color:#8a8478 style C3 fill:#191918,stroke:#8a8478,color:#8a8478

In the diagram above, the producer sends a message to the queue. Only one consumer receives it. The dashed lines indicate that Consumers B and C exist but do not receive this particular message. They will get the next ones.

Key properties of message queues:

  • One message, one consumer. No fan-out.
  • Guaranteed delivery. Messages persist until acknowledged.
  • Ordering. FIFO queues (like SQS FIFO) preserve message order. Standard queues offer best-effort ordering.
  • Backpressure. If consumers slow down, the queue grows. You can monitor queue depth as a scaling signal.

Common implementations: Amazon SQS, RabbitMQ, ActiveMQ, Azure Service Bus queues.

Pub/Sub: Fan-Out

In pub/sub, a publisher sends a message to a topic. Every subscriber to that topic receives a copy. The publisher does not know (or care) how many subscribers exist. This decouples the sender from receivers entirely.

Consider an order placement event. The inventory service needs to reserve stock. The email service needs to send a confirmation. The analytics service needs to record the sale. The shipping service needs to prepare a label. With a queue, you would need four separate queues or a single consumer that routes to all four. With pub/sub, you publish once to an "order.placed" topic. All four services receive the event independently.

graph LR P[Publisher] -->|publishes event| T[Topic: order.placed] T -->|copy| S1[Inventory Service] T -->|copy| S2[Email Service] T -->|copy| S3[Analytics Service] T -->|copy| S4[Shipping Service] style T fill:#222221,stroke:#c8a882,color:#ede9e3 style P fill:#191918,stroke:#6b8f71,color:#ede9e3 style S1 fill:#191918,stroke:#c8a882,color:#ede9e3 style S2 fill:#191918,stroke:#c8a882,color:#ede9e3 style S3 fill:#191918,stroke:#c8a882,color:#ede9e3 style S4 fill:#191918,stroke:#c8a882,color:#ede9e3

Every subscriber gets its own copy of the message. Adding a fifth subscriber (say, a fraud detection service) requires zero changes to the publisher. This is the power of fan-out: extensibility without coordination.

Key properties of pub/sub:

  • One message, many consumers. Fan-out by default.
  • Fire and forget (for the publisher). The topic handles distribution.
  • No inherent ordering guarantee across subscribers. Each subscriber processes at its own pace.
  • Message durability varies. SNS does not persist messages. If a subscriber is down when the message arrives, it misses it (unless backed by a queue).

Common implementations: Amazon SNS, Google Cloud Pub/Sub, Redis Pub/Sub, Kafka topics (with consumer groups for hybrid behavior).

The SNS + SQS Pattern

In AWS, a common architecture combines both. SNS provides fan-out. SQS provides durability and competing consumers. An event publishes to an SNS topic. Each downstream service subscribes via its own SQS queue. This gives you fan-out (pub/sub) with guaranteed delivery (queue) per subscriber.

graph LR P[Publisher] --> SNS[SNS Topic] SNS --> Q1[SQS: Inventory] SNS --> Q2[SQS: Email] SNS --> Q3[SQS: Analytics] Q1 --> C1[Inventory Worker] Q2 --> C2[Email Worker] Q3 --> C3[Analytics Worker] style SNS fill:#222221,stroke:#c8a882,color:#ede9e3 style Q1 fill:#191918,stroke:#6b8f71,color:#ede9e3 style Q2 fill:#191918,stroke:#6b8f71,color:#ede9e3 style Q3 fill:#191918,stroke:#6b8f71,color:#ede9e3

This hybrid pattern is widely used in production AWS architectures. Each queue can scale its consumers independently, retry failed messages, and use dead-letter queues for poison messages.

Comparison Across Dimensions

Dimension Message Queue Pub/Sub
Delivery model Point-to-point (one consumer per message) Fan-out (all subscribers get a copy)
Consumer count Competing consumers share the load Each subscriber is independent
Message persistence Stored until consumed and acknowledged Varies: SNS does not persist, Kafka persists
Ordering FIFO available (SQS FIFO, RabbitMQ) Per-partition ordering (Kafka), none (SNS)
Replay Not possible (message deleted after ack) Possible in Kafka (offset-based), not in SNS
Backpressure Queue depth grows, natural backpressure signal Subscribers must keep up or buffer independently
Coupling Producer knows the queue, loosely coupled Producer knows only the topic, fully decoupled
Use case Task distribution, job processing, serial workflows Event notification, broadcasting, multi-service fanout
Dead-letter handling Built-in DLQ support (SQS, RabbitMQ) Requires per-subscriber queue (SNS+SQS pattern)
Scaling consumers Add more competing consumers to one queue Each subscriber scales independently

When to Use Which

The decision is usually straightforward once you ask one question: does this message need to reach one handler or many?

Use a queue when:

  • Work must be processed exactly once by one worker (payment processing, image resizing, PDF generation).
  • You need backpressure and rate control (queue depth as a scaling metric).
  • Order matters (FIFO processing of sequential events).

Use pub/sub when:

  • Multiple independent services need to react to the same event (order placed, user registered).
  • You want to add new subscribers without modifying the publisher.
  • Events are informational rather than actionable tasks (audit logs, analytics events).

Use both (SNS+SQS or Kafka consumer groups) when:

  • You need fan-out AND guaranteed delivery per subscriber.
  • Different subscribers process at different speeds.
  • You want dead-letter queues per subscriber for independent failure handling.

Systems Thinking Lens

Queues create a balancing loop. As work arrives faster than consumers process it, queue depth grows. Queue depth triggers autoscaling (more consumers), which drains the queue. The system self-regulates. Pub/sub creates a reinforcing loop of extensibility: each new event type attracts more subscribers, which increases the value of the event system, which encourages publishing more events. Left unchecked, this becomes an "event soup" where hundreds of event types flow through the system with no clear ownership. The systems thinker sets boundaries: event catalogs, ownership per topic, and explicit contracts between publishers and subscribers.

Further Reading

  • AWS Documentation, Amazon SQS Developer Guide. Official reference for SQS queue types, visibility timeout, dead-letter queues, and FIFO ordering.
  • Ably, Apache Kafka vs RabbitMQ vs AWS SNS/SQS. Comprehensive comparison of messaging systems with architecture diagrams and use-case recommendations.
  • Gregor Hohpe and Bobby Woolf, Enterprise Integration Patterns (Addison-Wesley, 2003). The canonical reference for messaging patterns including point-to-point channels, publish-subscribe channels, and message routing.
  • Encore, Message Queues vs Pub/Sub. Practical comparison with code examples and decision framework.

Assignment

For each scenario below, decide whether you would use a message queue, pub/sub, or a combination. Name a specific technology (SQS, SNS, RabbitMQ, Kafka) and explain your reasoning in 2-3 sentences.

  1. Send a welcome email after user signup. Only one email service should send the email. Duplicate emails are unacceptable.
  2. Notify 5 services when an order is placed. Inventory, email, analytics, shipping, and fraud detection all need the event. They process at different speeds.
  3. Process credit card payments one at a time. Payments must be processed in the exact order they were submitted. Each payment is handled by one worker.

Bonus: For scenario (b), draw the architecture. Would you use SNS alone, SQS alone, or SNS+SQS? What happens if the analytics service goes down for 2 hours?

More Than a Message Queue

Apache Kafka is not a traditional message broker. It is a distributed, append-only commit log. Messages are not deleted after consumption. They are retained for a configurable period (or indefinitely with log compaction). This single design decision changes everything about how you build event-driven systems.

Kafka was originally built at LinkedIn to handle the firehose of activity data: page views, searches, profile updates. The engineering team published benchmarks showing 2 million writes per second on three commodity machines. Today, Kafka powers event streaming at Netflix, Uber, Airbnb, and thousands of other organizations processing trillions of messages per day.

Core Architecture

Kafka's storage model is a partitioned, replicated, append-only log. Producers write to the end of a partition. Consumers read from any offset. Messages are never modified in place. This sequential I/O pattern is why Kafka saturates disk throughput far more efficiently than random-access message brokers.

Four concepts define Kafka's architecture:

Brokers are the servers that store data and serve client requests. A Kafka cluster typically runs 3 or more brokers for fault tolerance. Each broker stores a subset of the total data.

Topics are named categories of messages. An "orders" topic holds order events. A "payments" topic holds payment events. Topics are logical groupings. The physical storage is handled by partitions.

Partitions are the unit of parallelism. Each topic is split into one or more partitions. Each partition is an ordered, immutable sequence of messages. Messages within a single partition are strictly ordered. Messages across partitions have no ordering guarantee. When a producer sends a message, it goes to one partition (determined by a partition key, round-robin, or custom logic).

Consumer groups enable parallel consumption. Each consumer in a group is assigned one or more partitions. A partition is assigned to exactly one consumer within a group. If a topic has 6 partitions and a consumer group has 3 consumers, each consumer reads from 2 partitions. If you add a 7th consumer, it sits idle because there are only 6 partitions.

graph TB subgraph Topic: orders P0[Partition 0] P1[Partition 1] P2[Partition 2] end subgraph Consumer Group A CA1[Consumer A1] -.-> P0 CA2[Consumer A2] -.-> P1 CA3[Consumer A3] -.-> P2 end subgraph Consumer Group B CB1[Consumer B1] -.-> P0 CB1 -.-> P1 CB2[Consumer B2] -.-> P2 end style P0 fill:#222221,stroke:#c8a882,color:#ede9e3 style P1 fill:#222221,stroke:#c8a882,color:#ede9e3 style P2 fill:#222221,stroke:#c8a882,color:#ede9e3 style CA1 fill:#191918,stroke:#6b8f71,color:#ede9e3 style CA2 fill:#191918,stroke:#6b8f71,color:#ede9e3 style CA3 fill:#191918,stroke:#6b8f71,color:#ede9e3 style CB1 fill:#191918,stroke:#c47a5a,color:#ede9e3 style CB2 fill:#191918,stroke:#c47a5a,color:#ede9e3

In this diagram, Consumer Group A has 3 consumers, each reading one partition. Consumer Group B has 2 consumers, so one consumer handles two partitions. Both groups read the same data independently. This is how Kafka combines pub/sub (multiple consumer groups) with queue semantics (competing consumers within a group).

Partition Count and Throughput

More partitions means more parallelism. A topic with 1 partition can only be read by 1 consumer per group. A topic with 64 partitions can be read by up to 64 consumers per group. But more partitions also means more open file handles, longer leader election times during broker failures, and higher end-to-end latency for ordered processing.

The following chart shows approximate write throughput scaling with partition count on a 3-broker cluster. These numbers are derived from LinkedIn's published benchmarks and Confluent's performance testing guides. Actual results depend on message size, replication factor, acknowledgment settings, and hardware.

Notice that throughput does not scale linearly. Going from 1 to 4 partitions gives roughly 3.5x improvement. Going from 16 to 64 gives only 1.5x. The bottleneck shifts from partition parallelism to network I/O, disk bandwidth, and replication overhead. Choosing the right partition count is about matching your expected consumer parallelism, not maximizing the number.

Ordering Guarantees

Kafka guarantees ordering within a partition but not across partitions. If you need all events for a specific order to be processed in sequence, use the order ID as the partition key. All events for that order will land in the same partition and be read in order. Events for different orders can be processed in parallel across partitions.

This is a common source of bugs. If you use a random partition key (or round-robin), two events for the same entity can end up in different partitions and be processed out of order by different consumers.

Log Compaction

Standard retention deletes messages after a time period (7 days by default). Log compaction is different: it keeps the latest value for each key and removes older values. This turns a Kafka topic into a key-value snapshot.

Example: a "user-profiles" topic. Every time a user updates their profile, a new message is produced with the user ID as the key. Log compaction ensures the topic always contains the latest profile for every user, even if the original message was written months ago. New consumers can read the compacted topic to rebuild the full current state.

Delivery Guarantees

Guarantee How It Works When to Use Risk
At-most-once Consumer commits offset before processing. If it crashes mid-processing, the message is skipped. Metrics, logging where losing a few messages is acceptable Message loss
At-least-once Consumer commits offset after processing. If it crashes after processing but before committing, the message is reprocessed. Most workloads. Make consumers idempotent to handle duplicates. Duplicate processing
Exactly-once Idempotent producer (sequence numbers per partition) + transactional APIs. Consume-transform-produce as an atomic unit. Financial transactions, inventory updates, any workflow where duplicates cause real harm Higher latency, more complexity

Exactly-Once Semantics: How It Works

Kafka's exactly-once support (introduced in KIP-98, Kafka 0.11) has two components:

Idempotent producers. Each producer gets a unique producer ID (PID). Each message to a partition carries a sequence number. The broker rejects any message whose sequence number is not exactly one greater than the last committed sequence for that PID and partition. Duplicates from retries are silently discarded.

Transactions. A producer can start a transaction, write to multiple partitions, commit consumer offsets, and then commit or abort the entire transaction atomically. Consumers configured with isolation.level=read_committed only see messages from committed transactions. This enables consume-transform-produce pipelines where the input offset commit and the output writes are atomic.

Kafka as Event Store

Because Kafka retains messages (unlike traditional queues that delete after consumption), it can serve as an event store. The full history of state changes is preserved. Any new service can replay the topic from the beginning to rebuild its own view of the data. This is the foundation of event sourcing architectures built on Kafka.

graph LR subgraph Kafka Cluster T1[orders topic] T2[payments topic] T3[inventory topic] end OS[Order Service] -->|produce| T1 PS[Payment Service] -->|produce| T2 IS[Inventory Service] -->|produce| T3 T1 -->|consume| Analytics[Analytics Service] T2 -->|consume| Analytics T3 -->|consume| Analytics T1 -->|replay from offset 0| NewService[New Reporting Service] style T1 fill:#222221,stroke:#c8a882,color:#ede9e3 style T2 fill:#222221,stroke:#c8a882,color:#ede9e3 style T3 fill:#222221,stroke:#c8a882,color:#ede9e3 style NewService fill:#191918,stroke:#6b8f71,color:#ede9e3

The new reporting service joins months after launch. It resets its consumer offset to zero and replays the entire order history to build its reports. No backfill scripts needed. The data is already in Kafka.

Systems Thinking Lens

Kafka's partition model creates a reinforcing loop: more partitions enable more consumers, which enables higher throughput, which attracts more use cases, which demands more partitions. Without governance, this leads to thousands of topics with unclear ownership. The balancing force is operational cost. Each partition consumes memory, file handles, and replication bandwidth. The systems thinker asks: what is the minimum partition count that serves our current parallelism needs, with room to grow but not room to waste?

Further Reading

  • Jay Kreps, Neha Narkhede, Jun Rao, Benchmarking Apache Kafka: 2 Million Writes Per Second (LinkedIn Engineering, 2014). The original benchmark that demonstrated Kafka's throughput on commodity hardware.
  • Confluent, Exactly-Once Semantics Are Possible: Here's How Apache Kafka Does It. Detailed explanation of idempotent producers, transactional APIs, and KIP-98.
  • Apache Kafka, KIP-98: Exactly Once Delivery and Transactional Messaging. The original design proposal with protocol-level details.
  • Confluent, Apache Kafka Performance. Official performance documentation with benchmarking methodology and tuning guidance.
  • Martin Kleppmann, Designing Data-Intensive Applications (O'Reilly, 2017), Chapter 11. Covers stream processing, log-based messaging, and Kafka's role as an event store.

Assignment

You are designing the Kafka infrastructure for an e-commerce platform. Define topics, partition counts, and partition keys for the following event streams. Justify each decision.

  1. Order events (order.created, order.updated, order.cancelled). The platform processes 50,000 orders per day. Three downstream services consume order events: fulfillment, analytics, and notifications.
  2. Payment events (payment.initiated, payment.completed, payment.failed). Payments must be processed in order per transaction. The system handles 100,000 payment events per day.
  3. Inventory events (stock.updated, stock.reserved, stock.released). 200,000 events per day across 50,000 SKUs. The warehouse service must see all events for a given SKU in order.

For each topic, specify: (a) the partition key, (b) the number of partitions and why, (c) the delivery guarantee (at-least-once or exactly-once) and why, (d) the retention policy (time-based or log compaction) and why.

Two Ways to Wire Services Together

Every distributed system faces the same question: when Service A needs something from Service B, should A wait for the answer or fire a message and move on? This is the fundamental split between synchronous and event-driven architecture. Neither is universally better. Each creates a different failure profile, a different debugging experience, and a different coupling structure.

Synchronous: Request-Response

In synchronous communication, the caller sends a request and blocks until it receives a response. HTTP REST calls, gRPC, and GraphQL queries are all synchronous by default. The caller knows immediately whether the operation succeeded or failed.

This simplicity has a cost. If Service B is slow, Service A is slow. If Service B is down, Service A fails. Chain five synchronous calls together (A calls B calls C calls D calls E) and your latency is the sum of all five, your availability is the product of all five. Five services at 99.9% availability each give you 99.5% end-to-end availability. That is 4.4 hours of downtime per year instead of 52 minutes.

sequenceDiagram participant Client participant OrderSvc as Order Service participant InvSvc as Inventory Service participant PaySvc as Payment Service Client->>OrderSvc: POST /orders OrderSvc->>InvSvc: Reserve inventory InvSvc-->>OrderSvc: Reserved OK OrderSvc->>PaySvc: Charge payment PaySvc-->>OrderSvc: Payment OK OrderSvc-->>Client: Order confirmed Note over Client,PaySvc: Total latency = sum of all calls
If any service fails, the whole chain fails

Synchronous architectures fail together. Event-driven architectures fail independently. This is the core tradeoff. Tight coupling gives you simplicity and immediate feedback. Loose coupling gives you resilience and independent scaling.

Event-Driven: Fire and React

In event-driven architecture, services communicate by publishing and consuming events. The Order Service does not call the Inventory Service directly. It publishes an "order.created" event. The Inventory Service subscribes to that event and reacts. The Order Service does not wait, does not know, and does not care what the Inventory Service does with the event.

This decoupling changes the failure profile. If the Inventory Service goes down for 10 minutes, events accumulate in the message broker. When the service comes back, it processes the backlog. The Order Service never noticed the outage. No cascading failures. No retry storms.

The cost is complexity. With synchronous calls, you can trace a request through a single HTTP call chain. With events, the flow is scattered across multiple services reacting to multiple event types. Debugging "why did this order not ship?" requires reconstructing the event timeline across services.

Dimension Synchronous Event-Driven
Coupling Tight: caller depends on callee's availability and interface Loose: publisher and subscriber share only the event schema
Latency Sum of all downstream calls Immediate response to client; processing happens asynchronously
Failure handling Cascading: one failure breaks the chain Isolated: failed service catches up from backlog
Traceability Simple: follow the call chain Hard: reconstruct event flows across services
Consistency Immediate (within a transaction or call) Eventual: state converges over time
Scaling Scale every service in the call chain equally Scale each service independently based on its own load
Testing Straightforward: mock downstream calls Requires event replay, integration testing across services
Data ownership Often shared databases or direct queries across services Each service owns its data, reacts to events to stay in sync

The Saga Pattern

In a monolith, you wrap a multi-step operation in a database transaction. Deduct inventory, charge payment, send confirmation. If any step fails, the whole transaction rolls back. In microservices, there is no single database. Each service owns its data. You cannot use a distributed transaction (two-phase commit) at scale because it blocks all participants and does not tolerate network partitions well.

The saga pattern replaces a single transaction with a sequence of local transactions, each in its own service. If a step fails, the saga executes compensating transactions to undo the previous steps. There are two ways to coordinate a saga: choreography and orchestration.

Choreography: Decentralized Coordination

In choreography, each service listens for events and decides what to do next. There is no central coordinator. The Order Service publishes "order.created." The Inventory Service hears it, reserves stock, and publishes "inventory.reserved." The Payment Service hears that, charges the card, and publishes "payment.completed." The Notification Service hears that and sends the confirmation email.

If payment fails, the Payment Service publishes "payment.failed." The Inventory Service hears it and releases the reserved stock (compensating transaction).

sequenceDiagram participant OS as Order Service participant IS as Inventory Service participant PS as Payment Service participant NS as Notification Service OS->>IS: Event: order.created IS->>IS: Reserve stock IS->>PS: Event: inventory.reserved PS->>PS: Charge card alt Payment succeeds PS->>NS: Event: payment.completed NS->>NS: Send confirmation else Payment fails PS->>IS: Event: payment.failed IS->>IS: Release stock (compensate) PS->>OS: Event: payment.failed OS->>OS: Mark order failed end

Choreography is simple for short sagas (2-3 steps). It becomes unmanageable for longer flows because the logic is spread across every participating service. Adding a step means modifying multiple services. Debugging requires tracing events across all participants.

Orchestration: Central Coordinator

In orchestration, a dedicated saga orchestrator controls the flow. It tells each service what to do and waits for the result. If a step fails, the orchestrator triggers compensating transactions in reverse order.

sequenceDiagram participant Orch as Saga Orchestrator participant IS as Inventory Service participant PS as Payment Service participant NS as Notification Service Orch->>IS: Reserve inventory IS-->>Orch: Reserved OK Orch->>PS: Charge payment alt Payment succeeds PS-->>Orch: Payment OK Orch->>NS: Send confirmation NS-->>Orch: Sent OK Orch->>Orch: Mark saga complete else Payment fails PS-->>Orch: Payment FAILED Orch->>IS: Release inventory (compensate) IS-->>Orch: Released OK Orch->>Orch: Mark saga failed end

The orchestrator is the single place where you can see the entire saga flow. It handles retries, timeouts, and compensation in one location. The downside is that the orchestrator becomes a single point of failure and a potential bottleneck. It also creates coupling: the orchestrator must know about every service in the flow.

The Transactional Outbox Pattern

Event-driven systems have a dual-write problem. When the Order Service creates an order, it must (1) write to its database and (2) publish an event. If the database write succeeds but the event publish fails, downstream services never learn about the order. If the event publishes but the database write fails, downstream services act on an order that does not exist.

The outbox pattern solves this. Instead of publishing the event directly, the service writes the event to an "outbox" table in the same database transaction as the business data. A separate process (a poller or a CDC connector like Debezium) reads the outbox table and publishes the events to the message broker. Because the business data and the outbox entry are in the same transaction, they succeed or fail together.

graph LR subgraph Order Service API[API Handler] -->|single transaction| DB[(Database)] API -->|same transaction| OB[Outbox Table] end OB -->|CDC / Poller| Kafka[Message Broker] Kafka --> IS[Inventory Service] Kafka --> PS[Payment Service] style DB fill:#222221,stroke:#c8a882,color:#ede9e3 style OB fill:#222221,stroke:#6b8f71,color:#ede9e3 style Kafka fill:#191918,stroke:#c47a5a,color:#ede9e3

Debezium is the most widely used open-source CDC tool for the outbox pattern. It reads the database's transaction log (PostgreSQL WAL, MySQL binlog) and streams changes to Kafka topics. No polling, no missed events, no dual-write risk.

Systems Thinking Lens

Synchronous architectures have a tight feedback loop: you call, you get a response, you know immediately. This is a reinforcing loop for developer productivity in the short term. But it creates a balancing loop at scale: more services in the chain means more latency and more failure modes, which pushes teams toward event-driven patterns.

Event-driven architectures introduce delay in the feedback loop. Events are processed asynchronously, so the cause (order created) and the effect (email sent) are separated in time. This delay makes debugging harder but makes the system more resilient to individual component failures. The saga pattern is a conscious design of compensating feedback loops: every forward action has a defined reverse action, creating a system that can self-correct when failures occur.

Further Reading

  • Chris Richardson, Saga Pattern (microservices.io). The canonical reference for saga design in microservices, with choreography and orchestration examples.
  • Microsoft Azure, Saga Design Pattern (Azure Architecture Center). Detailed guidance with decision flowcharts and implementation considerations.
  • Debezium, Reliable Microservices Data Exchange with the Outbox Pattern. Step-by-step implementation guide using Debezium CDC with PostgreSQL and Kafka.
  • Chris Richardson, Transactional Outbox Pattern (microservices.io). Pattern definition with polling and CDC-based implementations.
  • ByteByteGo, Saga Pattern Demystified: Orchestration vs Choreography. Visual comparison with practical decision criteria.

Assignment

Design a saga for the following order flow. Specify whether you would use choreography or orchestration, and justify your choice.

Flow:

  1. Step 1: Deduct inventory for the ordered items.
  2. Step 2: Charge the customer's credit card.
  3. Step 3: Send a confirmation email.

Questions:

  1. What happens if Step 2 (payment) fails after Step 1 (inventory deduction) has already succeeded? Write the compensating transaction.
  2. What happens if Step 3 (email) fails after Steps 1 and 2 succeed? Should you compensate Steps 1 and 2? Why or why not?
  3. How would you use the outbox pattern to ensure the "order.created" event is reliably published after the order is saved to the database?
  4. Draw a sequence diagram (choreography or orchestration) for the happy path and the failure path from question (a).

The Observability Gap in Distributed Systems

In a monolith, debugging is straightforward. A request enters the application, moves through functions, and you can trace the entire flow in a single stack trace. In a microservices architecture, a single user request might touch 6, 10, or 20 services. Each service has its own logs. If the response is slow, which service caused the delay? If there is an error, where did it originate? Without distributed tracing, you are left searching through logs from a dozen services, trying to correlate timestamps manually.

A trace is a story. Each span is a chapter. Without the story, you are debugging in the dark. Distributed tracing connects the dots across service boundaries, giving you a single timeline for a request that touches many systems.

Traces, Spans, and Context Propagation

Distributed tracing relies on three concepts:

Trace. A trace represents the complete journey of a single request through the system. It has a unique trace ID (typically a 128-bit random value) that is propagated across every service the request touches. All work related to that request shares this trace ID.

Span. A span represents a single unit of work within a trace. Each service creates one or more spans. A span has a start time, duration, operation name, status, and optional metadata (tags/attributes). Spans have parent-child relationships, forming a tree. The root span is created by the first service. Each downstream call creates a child span.

Context propagation. When Service A calls Service B, the trace ID and the current span ID must travel with the request. This is context propagation. For HTTP calls, the trace context is passed in headers (the W3C traceparent header is the standard). For Kafka messages, it is embedded in message headers. Without context propagation, spans from different services cannot be linked into a single trace.

graph TB subgraph "Trace: abc-123" R[Root Span: API Gateway
0ms - 450ms] --> S1[Span: Order Service
10ms - 400ms] S1 --> S2[Span: Inventory Service
20ms - 120ms] S1 --> S3[Span: Payment Service
130ms - 380ms] S3 --> S4[Span: Fraud Check
140ms - 250ms] end style R fill:#222221,stroke:#c8a882,color:#ede9e3 style S1 fill:#222221,stroke:#6b8f71,color:#ede9e3 style S2 fill:#191918,stroke:#c8a882,color:#ede9e3 style S3 fill:#191918,stroke:#c47a5a,color:#ede9e3 style S4 fill:#191918,stroke:#8a8478,color:#ede9e3

In this trace, the API Gateway receives the request (root span, 450ms total). It calls the Order Service (child span, 390ms). The Order Service calls Inventory (100ms) and then Payment (250ms). Payment calls Fraud Check (110ms). The waterfall view shows that Payment is the bottleneck: it accounts for more than half the total latency. Without tracing, you would only know the overall response was 450ms.

The Span Hierarchy as a Timeline

Tracing UIs display spans as a waterfall diagram. Each span is a horizontal bar. Nested spans are indented under their parent. The width of the bar represents duration. This immediately reveals where time is spent.

gantt title Request Trace: abc-123 (450ms total) dateFormat X axisFormat %Lms section API Gateway Root span :0, 450 section Order Service Process order :10, 400 section Inventory Check stock :20, 120 section Payment Charge card :130, 380 section Fraud Check Verify transaction :140, 250

This Gantt-style view shows the same trace as a timeline. The Inventory check completes quickly (100ms). Payment takes 250ms, with the Fraud Check consuming most of that time. If you needed to reduce overall latency, you would focus on the Payment/Fraud Check path.

OpenTelemetry: The Standard

OpenTelemetry (OTel) is the industry-standard framework for distributed tracing, metrics, and logs. It was formed by merging two earlier projects: OpenTracing and OpenCensus. The value proposition is simple: instrument your code once with OpenTelemetry, then send the data to any compatible backend (Jaeger, Zipkin, AWS X-Ray, Datadog, Grafana Tempo).

OpenTelemetry provides:

  • SDKs for Java, Python, Go, JavaScript, .NET, and more. Auto-instrumentation libraries hook into popular frameworks (Spring Boot, Express, Django) and create spans automatically for HTTP calls, database queries, and message broker interactions.
  • The OpenTelemetry Collector, a proxy that receives telemetry data, processes it (filtering, sampling, batching), and exports it to one or more backends. This decouples your application from the tracing backend.
  • W3C Trace Context propagation out of the box, ensuring interoperability across services written in different languages.

Tracing Tools Compared

Tool Origin Storage Strengths Limitations
Jaeger Uber, CNCF graduated Cassandra, Elasticsearch, Kafka, Badger Adaptive sampling, dependency graph, strong Kubernetes integration Requires self-hosting and storage management
Zipkin Twitter, open source Cassandra, Elasticsearch, MySQL, in-memory Lightweight, simple UI, broad language support Fewer advanced features than Jaeger, simpler sampling
AWS X-Ray Amazon Web Services Managed (AWS) Native AWS integration (Lambda, ECS, API Gateway), no infrastructure to manage Vendor lock-in, limited customization, AWS-only
Grafana Tempo Grafana Labs Object storage (S3, GCS) No indexing required, cost-effective at scale, Grafana dashboard integration Search requires TraceQL or trace ID lookup, newer ecosystem
Datadog APM Datadog (commercial) Managed (Datadog) Unified metrics, logs, and traces in one platform. Powerful search and alerting. Expensive at scale. Proprietary.

Sampling Strategies

In a system processing millions of requests per second, storing a trace for every request is impractical. Storage costs would be enormous and most traces are uninteresting (successful requests with normal latency). Sampling decides which traces to keep.

Strategy How It Works Advantage Disadvantage
Head-based (probabilistic) Decision made at the start of the trace. Example: sample 1% of all requests. The decision propagates to all downstream services. Simple, low overhead. All spans in a sampled trace are captured. Interesting traces (errors, slow requests) are missed at the same rate as boring ones.
Tail-based Decision made after the trace is complete. The collector buffers all spans and decides to keep or drop based on the full trace (e.g., keep all traces with errors or latency > 2s). Captures all interesting traces. No important data is lost. Requires buffering all spans until the trace completes. Higher memory and compute cost at the collector.
Rate-limiting Cap at N traces per second, regardless of traffic volume. Useful for controlling costs. Predictable storage costs. Under-samples during traffic spikes. Over-samples during low traffic.
Adaptive (Jaeger) Automatically adjusts sampling rates per operation based on traffic volume. High-traffic endpoints get lower rates, low-traffic endpoints get higher rates. Balanced representation across all endpoints without manual tuning. More complex configuration. Requires Jaeger's collector infrastructure.

In practice, many organizations use a combination: head-based sampling at 1-5% for general visibility, plus tail-based sampling to always capture errors and high-latency traces. This balances cost against the need to debug production issues.

Designing a Tracing Setup

A production tracing pipeline typically looks like this:

graph LR A[Service A] -->|traceparent header| B[Service B] B -->|traceparent header| C[Service C] C -->|traceparent header| D[Service D] A -->|OTLP| Coll[OTel Collector] B -->|OTLP| Coll C -->|OTLP| Coll D -->|OTLP| Coll Coll -->|sampling + export| Backend[Jaeger / Tempo / X-Ray] Backend --> UI[Tracing UI] style Coll fill:#222221,stroke:#c8a882,color:#ede9e3 style Backend fill:#191918,stroke:#6b8f71,color:#ede9e3 style UI fill:#191918,stroke:#c47a5a,color:#ede9e3

Each service instruments its code with the OpenTelemetry SDK. On every incoming request, the SDK extracts the trace context from headers (or creates a new trace if none exists). On every outgoing call, the SDK injects the trace context into headers. Spans are exported via OTLP (OpenTelemetry Protocol) to the OTel Collector. The collector applies sampling, batching, and filtering, then exports to the tracing backend.

What to Capture in Spans

Not all spans are equally useful. At minimum, capture:

  • Operation name: GET /api/orders/{id}, db.query, kafka.produce
  • Duration: Start and end timestamps.
  • Status: OK, ERROR, with error message if applicable.
  • Attributes: http.method, http.status_code, db.system, db.statement (sanitized), messaging.system, user.id (if relevant).
  • Events: Log entries attached to the span (e.g., "retry attempt 2", "cache miss").

Avoid capturing sensitive data in span attributes: passwords, credit card numbers, PII. Sanitize database queries to remove parameter values. Set clear policies for what is and is not included in trace data.

Systems Thinking Lens

Distributed tracing introduces a meta-feedback loop. Without tracing, teams lack visibility into cross-service behavior, so they make local optimizations that may not improve overall system performance. Tracing closes this loop by making the global behavior visible. A delay in Service D is now traceable to a slow database query, which is traceable to a missing index. The systems thinker recognizes tracing as a leverage point: it does not fix problems directly, but it makes problems visible, which accelerates every other improvement.

However, tracing itself is a system with its own feedback dynamics. More services generate more spans. More spans require more collector capacity and storage. Costs grow with adoption. Sampling is the balancing loop that keeps the system sustainable. Without it, the observability infrastructure becomes a scaling problem of its own.

Further Reading

  • OpenTelemetry, Traces (official documentation). Definitive reference for trace concepts, span structure, context propagation, and the OpenTelemetry data model.
  • Jaeger, Architecture Overview (official documentation). How Jaeger collectors, agents, and storage backends work together.
  • Charity Majors, Liz Fong-Jones, George Miranda, Observability Engineering (O'Reilly, 2022). Comprehensive treatment of tracing, metrics, and logs in production systems, with practical guidance on sampling and cost management.
  • W3C, Trace Context Specification. The standard for the traceparent and tracestate HTTP headers used for context propagation.
  • Ajit Singh, Distributed Tracing: Jaeger vs Tempo vs Zipkin. Side-by-side comparison of tracing backends with architecture diagrams and feature matrices.

Assignment

A user reports that a page load takes 2 seconds. The request touches 6 microservices: API Gateway, User Service, Product Service, Recommendation Service, Cart Service, and Pricing Service.

  1. Without tracing: Describe your debugging process. Which logs do you check first? How do you correlate events across 6 services? How long might this investigation take?
  2. Design a tracing setup:
    • What ID propagates across all 6 services? How is it passed (which header)?
    • Where are spans created? Name at least 8 spans you would expect in this trace.
    • Which sampling strategy would you use if the system handles 50,000 requests per second? Justify your choice.
  3. With tracing in place: You see that the Recommendation Service span takes 1.4 seconds out of the 2-second total. What are your next steps? What span attributes would help you narrow down the cause?
© Ibrahim Anwar · Bogor, West Java
This work is licensed under CC BY 4.0
  • Links
  • Entity
  • RSS