Crisp Reading Notes on Latest Technology Trends and Basics

Overview

Centralized Name Server

Both Google (Chubby), and Yahoo (Zookeeper) have been able to implement their service level co-ordinations of their Services and Jobs on top of “Centralized Name Servers”.

These Centralized Name Servers allow a lot of state to be exchanged at a coarse granularity between the co-operating services in a Distributed Environment.

The following are some of the features, where this may help

Configuration Each server / service instance in the system may read a centralized configuration.

This could be used to push a centralized configuration

Status Each server / service could periodically update the status.

This could be used for fault detection and recovery

Synchronization Each server could share its current status

This could be used to help several processes plan and synchronize activities

Synchronization

It is not very obvious that a Name Server could help with Synchronization, but it does. In this article, we discuss how Zookeeper – a Centralized Name Server helps nodes in a distributed system to achieve synchronization.

We discuss two kinds of synchronization primitives

Barrier When “n” workers become available, tasks can be scheduled
Task Queue When job-units become available, they can be picked up by the entities

References

Distributed Barrier

How does it work

A distributed barrier is created and defined as follows

  1. A barrier is instantiated with a name and a size “N”
  2. A thread enters a barrier by executing an enter function
    • The function hangs till there is a critical mass of “N”
    • Then it continues processing
  3. The thread picks up its unit-of-work.
  4. A thread may timeout, and leaves the barrier without picking up the work. In that case, we may need to search for an alternate thread.

How does Zookeeper help

Zookeeper is the common place where the centralized state of the barrier may be kept

  • The synchronization classes on each of the servers/services may use this distributed state to see each others’ state, and decide when to block or unblock.

The steps in the function are

  1. Each thread, when it enters the barrier, creates a node corresponding to itself
  2. Then it waits for the count of the children to reach “N”
  3. Every time there is a new node adding data, all other nodes are informed using a reliable callback.
  4. In each of those callbacks, the Zookeeper state is checked again, and if the critical mass is there, the function will return allowing the thread to continue

Implementation

SyncPrimitive

  • Both Barriers and Locks use the following class definition
    • Watcher is a logical interface that specifies the function – process
    • The constructor instantiates a new Zookeeper instance, and instantiates a mutex
public class SyncPrimitive implements Watcher {

static ZooKeeper zk = null;

static Integer mutex;

String root;

SyncPrimitive(String address) throws KeeperException, IOException {

if(zk == null){

System.out.println(“Starting ZK:”);

zk = new ZooKeeper(address, 3000, this);

mutex = new Integer(-1);

System.out.println(“Finished starting ZK: ” + zk);

}

}

@Override

synchronized public void process(WatcherEvent event) {

synchronized (mutex) {

mutex.notify();

}

}

}

Barrier

  • The Barrier constructor works as follows
    • The name of Zookeeper server, and the path of the Barrier node, as well as the size are sent
    • It creates a root node for the barrier, if no such node existed.
/**
  * Barrier constructor
  *
  * @param address
  * @param name
  * @param size
  */
Barrier(String address, String name, int size)
  throws KeeperException, InterruptedException, UnknownHostException 
{
  super(address);
  this.root = name;
  this.size = size;

  // Create barrier node
  if (zk != null) {
    Stat s = zk.exists(root, false);
    if (s == null) {
      zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
    }
  }

  // My node name
  name = new 
    String(InetAddress.getLocalHost().getCanonicalHostName().toString());
}
  • To enter a barrier, each thread needs to execute the enter() method
    • Create an ephemeral node in Zookeeper, under the specified name of the node
    • Next, retrieve the number of children under the Zookeeper node
    • If the number of children under a specified node is equal to the barrier size, then we return and the computation can start
    • Else, we block on the mutex.
  • The notify for the mutex comes when there is a Watcher event in the SyncPrimitive.process function specified above
/**
  * Join barrier
  *
  * @return
  * @throws KeeperException
  * @throws InterruptedException
  */

  boolean enter() throws KeeperException, InterruptedException{
    zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateFlags.EPHEMERAL);

    while (true) {
      synchronized (mutex) {
        ArrayList<String> list = zk.getChildren(root, true);

        if (list.size() < size) {
          mutex.wait();
        } else {
          return true;
        }
      }
    }
  }
  • Once the computation finishes, it leaves the barrier by removing itself from the Zookeeper tree
    • The routine first removes the self-node from the Zookeeper tree
    • Then, if there are still nodes to be released, then, we wait for the event of adding or removing children under this node.
/**
  * Wait until all reach barrier
  *
  * @return
  * @throws KeeperException
  * @throws InterruptedException
  */

  boolean leave() throws KeeperException, InterruptedException{

    zk.delete(root + "/" + name, 0);

    while (true) {
      synchronized (mutex) {
        ArrayList<String> list = zk.getChildren(root, true);
        if (list.size() > 0) {
           mutex.wait();
        } else {
          return true;
        }
      }
    }
  }

Producer-Consumer

Producer-Consumer primitives have two operations – produce and consume

The constructor operation is coded as follows

  • The node registers itself under its Internet address
/**
  * Constructor of producer-consumer queue
  *
  * @param address
  * @param name
  */

  Queue(String address, String name)
        throws KeeperException, InterruptedException 
  {
    super(address);
    this.root = name;

    // Create ZK node name
    if (zk != null) {
      Stat s = zk.exists(root, false);
      if (s == null) {
        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
      }
    }
  }

The produce operation is specified as follows

  • At the first step, the producer adds a node under the root. This node is labeled /element, and has a suffix, that is automatically and atomically incremented
  • This is done using the flag – CreateFlags.SEQUENCE
  • Note that the Work-Unit itself is specified as the value of the node
  • Since the size of the Work-Units is unbounded, there are no locks
/**
  * Add element to the queue.
  *
  * @param i
  * @return
  */

boolean produce(int i) throws KeeperException, InterruptedException{
  ByteBuffer b = ByteBuffer.allocate(4);
  byte[] value;

  // Add child with value i
  b.putInt(i);
  value = b.array();
  zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
            CreateFlags.SEQUENCE);

  return true;
}

For the consume operation,

  • Fetch the children, and try to delete the first node in the result
  • If there are no children, wait for an event to see if the lock can be released
/**
  * Remove first element from the queue.
  *
  * @return
  * @throws KeeperException
  * @throws InterruptedException
  */
  int consume() throws KeeperException, InterruptedException{

    int retvalue = -1;
    Stat stat = null;

    // Get the first element available
    while (true) {
      synchronized (mutex) {
        ArrayList<String> list = zk.getChildren(root, true);

        if (list.size() == 0) {
          System.out.println("Going to wait");
          mutex.wait();
        } else {

          Integer min = new Integer(list.get(0).substring(7));
          for(String s : list){
            Integer tempValue = new Integer(s.substring(7));
            if(tempValue < min) min = tempValue;
          }

          System.out.println("Temporary value: " + root + "/element" + min);
          byte[] b = zk.getData(root + "/element" + min, false, stat);
          zk.delete(root + "/element" + min, 0);
          ByteBuffer buffer = ByteBuffer.wrap(b);
          retvalue = buffer.getInt();

          return retvalue;
        }
      }
    }
  }
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Tag Cloud

%d bloggers like this: