The Plumber's Approach to Distributed Systems

The Plumber's Approach to Distributed Systems

How to structure distributed systems for low latency at scale

Distributed systems are often structured as request-driven stateless microservices. However, that approach has its limits and, at scale, invites complexity and tends to sacrifice both correctness and latency unwittingly.

In a stateless design, all service instances operate identically, and a request is entirely handled at whatever instance it happens to hit. Any request contention must be resolved via database transactions, leaving little room to maneuver if latency is too high and worsens with scale. Previous posts on the tradeoffs and pitfalls of microservices, statelessness, and caching explored these issues in depth.

But what's the alternative?

One approach -- let's call it the plumber's approach -- is for any specific request to be processed on a single chosen instance, with internal low-latency routing, aka plumbing, designed to forward that request to the right destination.

The chosen instance is selected based on a service-relevant business domain (such as "customers," "cities," or "gizmos"), which means that a single instance exclusively handles all requests for a given domain unit. Concurrent requests for each work unit can now practically be processed by a single-threaded work handler for complete control over state, caching, and everything else with plenty of room to maneuver.

The approach's effectiveness hinges on choosing a suitable work domain to partition, balancing the scope of exclusivity with how directly incoming requests can be routed. The approach aims to simplify request processing by solving latency and scale concerns with more sophisticated plumbing. Hence the name.

Work distribution

When designing a system this way, the main question is how to partition (or shard) the business domain to distribute the work. Some systems involve multiple entities with no or multiple apparent candidates. The question is highly problem-dependent and, thereby, less fruitful to discuss in the abstract, so let's consider a familiar example:

"The Usual Pizza" is an online pizzeria that caters to repeat customers, introduced in The Overuse Of Microservices. Their signature feature is the 1-click reorder, where customers can reorder "the usual" in a single request optionally without specifying any further information. The system must fetch the current delivery address and "usual pizza" selection with ingredients to place the order. But as outlined in The Caching Trap, correctness and low latency are hard to achieve simultaneously.

A single order request involves multiple entities: customers, pizza ingredients, and -- if available in multiple locations -- pizzerias. Which of them (if any) is a suitable business domain? It depends. In this case, the main concern has been customer configuration correctness and integrity during order placement, so the best option is likely to partition by customer. Moreover, each request includes customer identification and different customer requests do not directly interact and can be processed concurrently. Partitioning by pizzeria could also be reasonable (and probably a better choice in other settings), but we'll stick with customer.

This choice means that if the system runs on four instances, each instance is roughly responsible for 25% of the customer base. All orders for a specific customer will be processed on the same instance, distributed by their name, say:

Here, Bob's orders will be processed by instance 1 (because B is in the A-F partition), and Joe's orders by instance 2. In practice, human names are stubbornly non-uniform, changeable, and nothing but trouble (e.g., which is 乔's instance?). The system would introduce an immutable customer id, such as a random UUID, to help ensure customers are spread out evenly across instances.

Even so, it does not mean that each instance necessarily handles 25% of orders placed, unlike a request-driven stateless service. Uneven load can be a risk.

Request routing

Each service instance must forward incoming requests to the correct instance, i.e., the instance that owns the work domain partition for the request. How does the receiving instance know which instance is the right destination?

There are many possibilities.

The simplest option is to make the ownership information part of the service config or even hardcoding it. A slightly more flexible option is to use a DNS-naming scheme: for example, make a.theusual.pizza, b.theusual.pizza, ..., z.theusual.pizza resolve to the owning instance. An incoming request for Joe would be redirected to j.theusual.pizza. These options are fine for small static services.

However, most services are hosted in a public cloud and need some elasticity: instances are restarted or fail, instances come and go from auto-scaling, etc. The work distribution scheme and matching request routing logic must handle that reality. It is hardly useful if a new instance joins due to system load but is not responsible for any customers.

A better approach is to use a distributed coordination service, such as Etcd or ZooKeeper, to mediate the registration of service instances and their current responsibility. Whatever the mechanism, each instance must know who owns what and how to connect (i.e., the routing table) to route and process requests correctly.

For example, a new order from Joe would arrive at a random instance (here, instance 3) and be forwarded to instance 2 by the frontend after consulting the routing table:

Some additional requirements:

  • The size of the routing table must be small. Each instance should be able to hold it in memory to forward requests quickly. An interval map representation for the routing table {[a;f] -> 1, [g;n] -> 2, ..} is compact and efficient. A direct map {a -> 1, b -> 1, c -> 1, ..} quickly becomes impractical.

  • Ownership must be exclusive. The exclusivity of ownership must be enforced via a lease or distributed lock, if dynamic. If two instances handle requests for the same customer, even briefly, we re-introduce the pitfalls of the stateless design. Also, updates to the routing table create race conditions: instances may receive requests for customers they no longer handle. A special error will let the plumbing retry.

  • Ownership should be long-lived. A change in ownership involves a slight but non-zero disruption, so frequent changes are counter-productive. This is easier said than done: load balancing with stability involves tradeoffs. In general, slow is better. Work handlers typically hold state in memory and lose it on ownership changes.

Dynamic work distribution is a deep and interesting problem in its own right, but fortunately, a simple scheme will usually do.

Local request dispatching

Each instance is responsible for part of the business domain, and the plumbing ensures all requests are routed to the owning instance and processed nowhere else. This guarantee is compelling because we have thereby funneled concurrent requests for a customer to a single instance (with some caveats).

It's a bit like the 90s again: single-machine multi-threaded programming.

But at 2020s scale.

Moreover, modern languages have far better concurrency support. We don't have to completely re-live the unforgiving world of threads, locks, and void* callbacks. Lightweight threads (fibers) and mature concurrency libraries are available in many mainstream languages. We can run a lightweight thread per customer as a semi-permanent request handling loop -- a Handler -- with exclusive access to any in-memory customer state.

The structure would be as follows (in pseudo-code):

struct Handler:
   // in-memory state

method Process:
   // initialize state

   loop:
      wait for:
         * new request from queue:
              // process next request w/ exclusive state access
         * halt signal:
              // lost ownership
              return

Each Handler runs a single lightweight Process thread, which waits for either new requests or a signal to halt. No manual locks are needed.

That's almost like the 60s again: single-threaded programming.

Well, almost.

First, the instance owning a partition must dispatch requests for any given customer to its handler without delaying requests for other customers. The dispatcher will use a suitable mechanism, such as concurrent queues, to pass requests to the handler. The exact mechanism depends on the language and frameworks used. If a work handler is overwhelmed, the dispatcher may need to reject its requests.

Continuing the example, the redirected request from instance 3 is received by the dispatcher on instance 2 and sent to the work handler for Joe:

The dispatch completes the request handling to a unique work handler. For new customers, the dispatcher must create a work handler.

Exclusive processing

Exclusivity is the key to the approach. All requests for Joe must go through a single work handler, so nothing happens elsewhere.

In particular, we can avoid the fundamental problems that plague the stateless microservice approach:

  1. Cache validity. Persistent state for Joe does not change unless the work handler changes it. Therefore, cached data does not become invalid unless the work handler invalidates it. If the work handler reads the address, "the usual" selection, and ingredient list on startup, it is already ready in memory for low-latency order processing.

  2. Atomicity control. There are no race conditions if we process each of Joe's orders sequentially. The work handler can even prioritize actions. And if an operation fails, we can undo any partial progress before moving to the next to avoid state inconsistency. Easy as pie.

There is more:

  1. State observability. The work handlers collectively hold in-memory state about all active customers. That makes it practical to report real-time gauge metrics (such as "how many pizzas are being prepared right now?") about that state in contrast to a stateless system where counters prevail. And with each handler being addressable, this approach supports requests for in-memory-only state. A stateless system would either have to rely on underlying storage or, less practically, collect information from all instances.

  2. Initiative. Chronically overlooked in the reactive stateless world, each work handler does not need to be purely reactive. It can periodically check local state and decide to take action. Or use local timers to schedule backup actions, such as ensuring pizza preparation progresses as expected.

Many of these possibilities neatly fall into additional wait for conditions, for example:

method Process:
   ...
   timer = Timer.every(min)
   check = Signal.new()

   loop:
      wait for:
         * new request from queue:
              if new order:
                 check.signal(expected completion)
              ...
         * halt signal:
              ...
         * timer fired:
              // emit gauge metrics
         * check signal:
              // check if backup action should be taken

Exclusive processing offers extensive control and many opportunities but also introduces some sharp edges.

Tradeoffs and pitfalls

The strict stateless design relies on database transactions for correctness, which takes a toll on latency and scale. The plumber's approach moves much of that responsibility to the work handler. A single-threaded work handler may look a lot like a database transaction -- but without the guarantees and restrictions of that environment. That freedom is a double-edged sword.

Some considerations:

  1. Source of truth. The in-memory state is authoritative, like a write-through-cache, but if the instance crashes or ownership moves, that state is lost. Any database state needed to restore the state of a work handler is, therefore, the real source of truth. Pizza orders, address changes, and on so must still be written to the database. A silver lining is that transaction contention can be minimized because the work handler naturally avoids concurrent updates.

     

    Work handlers may omit to persist optional or frequently-changing information for performance. If they lose ownership, they may want to drain such state for a more graceful transfer. If they defer too much, the disruption window on ownership change may be too long.

  2. Imperfect business domains. Not every domain partitions well. If mutable global state is needed across work handlers, updates must be resolved via database transactions or distributed locks, and we re-introduce the challenges with stateless caching. But often, a partitioning exists that works well enough: partitioning by customer, for example, uses concurrent order placements for the individual pizzerias serialized by the database. That's ok. Real-world problems are rarely perfectly partitionable or have one solution only.

     

    If we had partitioned by pizzeria, we could introduce customer ownership via a "primary" pizzeria for each customer. Most customers order from one pizzeria only anyway. Only orders for a non-primary pizzeria must contact the primary pizzeria handler for the correct customer state and take the latency/complexity hit. It's a tradeoff.

  3. Hander scope and async I/O. The scope of the work handler must match what it can do in the single-threaded processing loop. Blocking or slow I/O may not be practical if the handler cannot keep up. Blocking I/O is okay for a small handler scope (like customer), but for a larger scope (like pizzeria), it may not be. A customer typically does not place multiple orders concurrently, but concurrent orders is not uncommon for a busy pizzeria. Async I/O is a solution where the handler acts as a single-threaded coordinator. However, that complicates the logic because the work handler acts while orders are in flight and re-introduces concurrency concerns and failure modes.

The plumber's approach is particularly suitable for low-latency distributed systems with non-trivial in-memory state or concurrency control requirements. The simplicity of single-threaded active work handlers is also beneficial. Of course, if the standard passive stateless design works without contortions, there is no need to complicate matters.

However, some systems use distributed in-memory caches or have frequently-changing real-time state that cannot be eagerly persisted. This is a territory that the stateless design cannot reach.