For in-depth information on various Big Data technologies, check out my free e-book “Introduction to Big Data“.

In YARN, the Resource Manager is a single point of failure (SPOF). Multiple Resource Manager instances can be brought up for fault tolerance but only one instance is Active. When the Active goes down or becomes unresponsive, another Resource Manager has to be elected to be the Active. Such a leader election problem is common for distributed systems with a active/standby design. YARN relays on ZooKeeper for electing the new Active. In fact, distributed systems also face other common problems such as naming service, configuration management, synchronization, group membership, etc. ZooKeeper is a highly reliable distributed coordination service for all these use cases. Higher order constructs, e.g. barriers, message queues, locks, two-phase commit, and leader election, can also be implemented with ZooKeeper. In the rest of book, we will find that many distributed services depend on the ZooKeeper, which is actually the goal of ZooKeeper: implementing the coordination service once and well and shared by many distributed applications.

Essentially, ZooKeeper is a distributed in-memory CP data store that has the following guarantees:

  • Sequential Consistency

Updates from a client will be applied in the order that they were sent.

  • Atomicity

Updates either succeed or fail. No partial results.

  • Single System Image

A client will see the same view of the service regardless of the server that it connects to.

  • Reliability

Once an update has been applied, it will persist from that time forward until a client overwrites the update.

  • Timeliness

The clients view of the system is guaranteed to be up-to-date within a certain time bound.

Data Model

ZooKeeper has a hierarchal namespace, much like a file system. The major difference is that each node (called znode) in the namespace, both internal node and leaf, can have associated data. The data stored at each znode is accessed atomically. Reads get all the data bytes associated with a znode and a write replaces it. To achieve high throughput and low latency, ZooKeeper keeps all the data in main memory. For recoverability, updates are logged to disk and the whole data tree is also snapshot in a fuzzy way (of both the data content and snapshot frequency). So ZooKeeper is like an in-memory key-value pair data store, of which the key namespace is organized in a tree structure. However, ZooKeeper is not intended to be used as a general database or large object store. In fact, the ZooKeeper client and the server implementations have sanity checks to ensure that znodes have less than 1M of data. In practice, the data should be at the scale of kilobytes on average as ZooKeeper is designed to manage coordination data such as configuration, status information, rendezvous, etc.

Each znode has an Access Control List (ACL) and a stat structure that includes timestamps and version numbers for data changes and ACL changes. ZooKeeper stamps each update in the form of zxid (ZooKeeper Transaction Id), which exposes the total ordering of all changes to ZooKeeper. When a znode’s data or ACL changes, the version numbers increase too. For every read, the client also receives the version of the data. And when it performs an update or a delete, it must supply the version of the data. If the version it supplies doesn’t match the current version of the data, the update will fail.

Clients can also set watches on znodes. A watches is an one-time trigger. Changes to a znode trigger the watch associated with it and then clear the watch. When a watch triggers, the client will receive a notification from ZooKeeper. Watches are sent asynchronously to watchers. But ZooKeeper guarantees that a client will see a watch event for a znode it is watching before seeing the new data that corresponds to that znode. Besides, the order of watch events from ZooKeeper corresponds to the order of the updates as seen by the ZooKeeper service.

Specially, ZooKeeper also has ephemeral nodes, which exist as long as the session that created the znode is active. When the session ends, the znode is deleted. With ephemeral nodes, we can easily implement the group membership of distributed systems. The group is represented by a znode. Each group member can create an ephemeral node under the group node. If a member leaves or fails abnormally, the corresponding znode will be deleted automatically when ZooKeeper detects the failure.

Another special kind of znode is sequence node whose name is automatically appended a monotonically increasing counter by ZooKeeper. This counter is unique to the parent znode. A simple way of implementing leader election with ZooKeeper is to use sequence and ephemeral nodes under a group node. The process that created the znode with the smallest appended sequence number is the leader. If the group size is not very big, all application processes can watch upon the current smallest znode. If the leader goes offline, the corresponding ephemeral node is removed and and all other processes can observe who is the new leader. If the group is very large, this design may cause a burst of operations that ZooKeeper has to process, referred as to the “herd effect”. An alternative approach is that each client watches upon only the largest znode that is smaller than its own znode. When a process receives a notification that the smallest znode is removed, it then executes the leader procedure. This avoids the herd effect as only one process is notified.

With watches and sequence nodes, one may also implement message queues with ZooKeeper. Just like not using ZooKeeper as a general database, however, it is not recommended to replace the normal message queue with ZooKeeper. The design of ZooKeeper does not fit the typical use cases of message queues. The performance of ZooKeeper is bad if there are many nodes with thousands of children. The 1MB size limit of ZooKeeper also prevents large messages.

Atomic Broadcast

To be fault tolerant, ZooKeeper should be replicated over a sets of hosts called an ensemble. The servers that make up the ZooKeeper service must all know about each other. As long as a majority of the servers are available, the ZooKeeper service will be available. More specifically, the service requires at least 2f+1 servers to tolerate up to f crash failures. In practice, a ZooKeeper service usually consists of three to seven machines. Because Zookeeper requires a majority, it is best to use an odd number of machines.

ZooKeeper Components

Every ZooKeeper server services clients and clients connect to exactly one server. To create a client session, the application code must provide a connection string containing a comma separated list of host:port pairs, each corresponding to a ZooKeeper server. The ZooKeeper client library will pick an arbitrary server and try to connect to it. If the client becomes disconnected from the server, the client will automatically try the next server in the list until a connection is re-established.

To provide high read throughput, ZooKeeper services the read requests from the local replica of state at each server. In contrast, all write requests are forwarded to a single server, referred as to the leader. The leader uses an atomic broadcast protocol, called Zab, to keep all the servers in sync. Such a leader is elected through a leader election algorithm and synchronized with a quorum of other servers, called followers. By sending all updates through the leader, non-idempotent requests are transformed into idempotent transactions. To guarantee the correct transformation, ZooKeeper enforces that there is only one leader in Zab. And the Zab protocol meets the following requirements:

  • Reliable delivery

If a message is delivered by one server, then it will be eventually delivered by all correct servers.

  • Total order

If message a is delivered before message b by one server, then every server that delivers a and b delivers a before b.

  • Causal order

If message a causally precedes message b and both messages are delivered, then a must be ordered before b.

  • Prefix property

If m is the last message delivered for a leader L, any message proposed before m by L must also be delivered.

Zab at a high level is a leader based protocol similar to Paxos. Compared to Paxos, Zab is primarily designed for primary-backup systems rather than for state machine replication. The Zab protocol consists of two modes: recovery/leader activation and broadcast/active messaging. When the service starts or after a leader failure, Zab transitions to recovery mode. Recovery mode ends when a leader emerges and a quorum of servers have synchronized their state with the leader. Synchronizing their state consists of guaranteeing that the leader and new server have the same state. Once a leader has a quorum of synchronized followers, it accepts messages to propose and coordinates message delivery. The broadcast looks just like two-phase commit without the need to handle aborts and all communication channels are assumed FIFO:

  • The leader sends proposals to all followers in the order that requests have been received. Before proposing a message the leader assigns a monotonically increasing unique zxid.
  • Followers process messages in the order they are received.
  • The leader will issue a COMMIT to all followers as soon as a quorum of followers have ACKed a message.
  • Followers deliver the message when they receive the COMMIT from the leader.