Crisp Reading Notes on Latest Technology Trends and Basics

Archive for the ‘Distributed and Concurrent’ Category

Using Zookeeper for Synchronization in Hadoop

Overview

Centralized Name Server

Both Google (Chubby), and Yahoo (Zookeeper) have been able to implement their service level co-ordinations of their Services and Jobs on top of “Centralized Name Servers”.

These Centralized Name Servers allow a lot of state to be exchanged at a coarse granularity between the co-operating services in a Distributed Environment.

The following are some of the features, where this may help

Configuration Each server / service instance in the system may read a centralized configuration.

This could be used to push a centralized configuration

Status Each server / service could periodically update the status.

This could be used for fault detection and recovery

Synchronization Each server could share its current status

This could be used to help several processes plan and synchronize activities

Synchronization

It is not very obvious that a Name Server could help with Synchronization, but it does. In this article, we discuss how Zookeeper – a Centralized Name Server helps nodes in a distributed system to achieve synchronization.

We discuss two kinds of synchronization primitives

Barrier When “n” workers become available, tasks can be scheduled
Task Queue When job-units become available, they can be picked up by the entities

References

Distributed Barrier

How does it work

A distributed barrier is created and defined as follows

  1. A barrier is instantiated with a name and a size “N”
  2. A thread enters a barrier by executing an enter function
    • The function hangs till there is a critical mass of “N”
    • Then it continues processing
  3. The thread picks up its unit-of-work.
  4. A thread may timeout, and leaves the barrier without picking up the work. In that case, we may need to search for an alternate thread.

How does Zookeeper help

Zookeeper is the common place where the centralized state of the barrier may be kept

  • The synchronization classes on each of the servers/services may use this distributed state to see each others’ state, and decide when to block or unblock.

The steps in the function are

  1. Each thread, when it enters the barrier, creates a node corresponding to itself
  2. Then it waits for the count of the children to reach “N”
  3. Every time there is a new node adding data, all other nodes are informed using a reliable callback.
  4. In each of those callbacks, the Zookeeper state is checked again, and if the critical mass is there, the function will return allowing the thread to continue

Implementation

SyncPrimitive

  • Both Barriers and Locks use the following class definition
    • Watcher is a logical interface that specifies the function – process
    • The constructor instantiates a new Zookeeper instance, and instantiates a mutex
public class SyncPrimitive implements Watcher {

static ZooKeeper zk = null;

static Integer mutex;

String root;

SyncPrimitive(String address) throws KeeperException, IOException {

if(zk == null){

System.out.println(“Starting ZK:”);

zk = new ZooKeeper(address, 3000, this);

mutex = new Integer(-1);

System.out.println(“Finished starting ZK: ” + zk);

}

}

@Override

synchronized public void process(WatcherEvent event) {

synchronized (mutex) {

mutex.notify();

}

}

}

Barrier

  • The Barrier constructor works as follows
    • The name of Zookeeper server, and the path of the Barrier node, as well as the size are sent
    • It creates a root node for the barrier, if no such node existed.
/**
  * Barrier constructor
  *
  * @param address
  * @param name
  * @param size
  */
Barrier(String address, String name, int size)
  throws KeeperException, InterruptedException, UnknownHostException 
{
  super(address);
  this.root = name;
  this.size = size;

  // Create barrier node
  if (zk != null) {
    Stat s = zk.exists(root, false);
    if (s == null) {
      zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
    }
  }

  // My node name
  name = new 
    String(InetAddress.getLocalHost().getCanonicalHostName().toString());
}
  • To enter a barrier, each thread needs to execute the enter() method
    • Create an ephemeral node in Zookeeper, under the specified name of the node
    • Next, retrieve the number of children under the Zookeeper node
    • If the number of children under a specified node is equal to the barrier size, then we return and the computation can start
    • Else, we block on the mutex.
  • The notify for the mutex comes when there is a Watcher event in the SyncPrimitive.process function specified above
/**
  * Join barrier
  *
  * @return
  * @throws KeeperException
  * @throws InterruptedException
  */

  boolean enter() throws KeeperException, InterruptedException{
    zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateFlags.EPHEMERAL);

    while (true) {
      synchronized (mutex) {
        ArrayList<String> list = zk.getChildren(root, true);

        if (list.size() < size) {
          mutex.wait();
        } else {
          return true;
        }
      }
    }
  }
  • Once the computation finishes, it leaves the barrier by removing itself from the Zookeeper tree
    • The routine first removes the self-node from the Zookeeper tree
    • Then, if there are still nodes to be released, then, we wait for the event of adding or removing children under this node.
/**
  * Wait until all reach barrier
  *
  * @return
  * @throws KeeperException
  * @throws InterruptedException
  */

  boolean leave() throws KeeperException, InterruptedException{

    zk.delete(root + "/" + name, 0);

    while (true) {
      synchronized (mutex) {
        ArrayList<String> list = zk.getChildren(root, true);
        if (list.size() > 0) {
           mutex.wait();
        } else {
          return true;
        }
      }
    }
  }

Producer-Consumer

Producer-Consumer primitives have two operations – produce and consume

The constructor operation is coded as follows

  • The node registers itself under its Internet address
/**
  * Constructor of producer-consumer queue
  *
  * @param address
  * @param name
  */

  Queue(String address, String name)
        throws KeeperException, InterruptedException 
  {
    super(address);
    this.root = name;

    // Create ZK node name
    if (zk != null) {
      Stat s = zk.exists(root, false);
      if (s == null) {
        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
      }
    }
  }

The produce operation is specified as follows

  • At the first step, the producer adds a node under the root. This node is labeled /element, and has a suffix, that is automatically and atomically incremented
  • This is done using the flag – CreateFlags.SEQUENCE
  • Note that the Work-Unit itself is specified as the value of the node
  • Since the size of the Work-Units is unbounded, there are no locks
/**
  * Add element to the queue.
  *
  * @param i
  * @return
  */

boolean produce(int i) throws KeeperException, InterruptedException{
  ByteBuffer b = ByteBuffer.allocate(4);
  byte[] value;

  // Add child with value i
  b.putInt(i);
  value = b.array();
  zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
            CreateFlags.SEQUENCE);

  return true;
}

For the consume operation,

  • Fetch the children, and try to delete the first node in the result
  • If there are no children, wait for an event to see if the lock can be released
/**
  * Remove first element from the queue.
  *
  * @return
  * @throws KeeperException
  * @throws InterruptedException
  */
  int consume() throws KeeperException, InterruptedException{

    int retvalue = -1;
    Stat stat = null;

    // Get the first element available
    while (true) {
      synchronized (mutex) {
        ArrayList<String> list = zk.getChildren(root, true);

        if (list.size() == 0) {
          System.out.println("Going to wait");
          mutex.wait();
        } else {

          Integer min = new Integer(list.get(0).substring(7));
          for(String s : list){
            Integer tempValue = new Integer(s.substring(7));
            if(tempValue < min) min = tempValue;
          }

          System.out.println("Temporary value: " + root + "/element" + min);
          byte[] b = zk.getData(root + "/element" + min, false, stat);
          zk.delete(root + "/element" + min, 0);
          ByteBuffer buffer = ByteBuffer.wrap(b);
          retvalue = buffer.getInt();

          return retvalue;
        }
      }
    }
  }
Advertisements

UNIX Signals – Understanding them, and coding for them

Signals are one of the most complex pieces to code for, within the Linux environment. It is difficult to fully get it right, unless one understands a little bit of the internals. In this write up, I summarize a few points from a couple of good ppts on the topic.

Enjoy the perusal …

MS-Word version of this write-up
 
 

Overview

References

What is a signal

A short message sent by the kernel to a process or group of processes

  • Signal number is present as part of a message
  • Parameters related to the message are allowed in POSIX.4

Details

Signal Transmission

  • Signal sending
    • Kernel updates data structures in the Process Control Block (PCB)
  • Signal receiving
    • Kernel forces the process to handle the signal
  • Pending signals
    • Signals that have been sent, but not yet received
    • Processes/threads can block signals – prevent them from being received.

Hints to handle signals

  • The user process declares how it wants to handle a signal using the data structure
    • The action may be SIG_IGN to ignore the signal, or SIG_DFL for the default action
    • If not ignored or defaulted, the handler is execute
struct sigaction {

  void (*sa_handler)();/* handler address, or SIG_IGN, or SIG_DFL */

  sigset_t sa_mask;    /* blocked signal list */

  int sa_flags; /* options e.g., SA_RESTART */

}

 

Sending signals

  • Signals are sent using one of the following system calls
    • info is 0 if sender is a user-process.
    • Info is 1, if sender is the kernel
send_sig_info(int sig, struct siginfo *info, struct task_struct *t);

kill_proc_info(int sig, struct siginfo *info, pid_t pid);

Receiving Signals

  • Handling is done in entry.S , after handling the system call or interrupt. Called from ret_from_intr()
  • The function handle_signal() calls deque_signal() multiple times, till there are no more signals
  • Each signal that is not ignored or defaulted must be caught.

Receiving signals during a System call

  • handle_signal() is called in the kernel mode, while the handlers are present in the user_mode. This can create problems.
  • A blocked call is placed in TASK_INTERRUPTIBLE state.
  • When a signal arrives, it is woken up in the TASK_RUNNING state to execute the signal handler.
  • After this, the process/task has to be placed back in the old state.
  • This is done, if a flag SA_RESTARTABLE is set as part of sa_flags, while registering the signal handler.

Real-time Signals

  • Real-time signals are sent as part of a signal queue, and are dequeued similarly.
  • To send a real-time signal

int sigqueue(pid_t pid, int sig, const union sigval value);

  • When dequeued, the handler function contains the following data structure.
typedef union sigval {

int sigval_int;

void *sival_ptr;

} sigval_t;

Usage of alarm

alarm(2) is a system call in Linux, that sends a signal after the specified time has expired.  Let us discuss various aspects of integrating the alarm system call, as an example.

Usage

In the example below, the alarm system call is supposed to timeout on a slow system call, such as read

if( signal(SIGALRM, sig_alrm) == SIG_ERR )

{
printf(“signal(SIGALRM) error\n”);
exit(1);
}


alarm(10);
n = read( 0, line, MAXLINE );

Premature alarm expiry

Even though the alarm is set of 10 seconds, which should be adequate time to read the data,

  • there may be cases, because of extreme scheduling issues, where the read may not even be executed.
  • In that case, the alarm would expire, and the read function would execute, unprotected by the alarm.
  • To avoid this, use setjmp and longjmp to ensure restartability of application code.
    • setjmp may be used to save the context of the program, prior to the alarm system call.
    • When the signal for alarm expires, longjmp may be used to set the context to that saved
    • This will execute the alarm call prior to the read.
void sig_alrm(int signo)
/* interrupt the read() and jump to
setjmp() call with value 1
*/
{
longjmp(env_alrm, 1);
}

int main(int argc, char *argv[] )

{

if( signal(SIGALRM, sig_alrm) == SIG_ERR) {
printf(“signal(SIGALRM) error\n”);
exit(1);
}

if( setjmp(env_alrm) != 0 )

{
fprintf(stderr, “\nread() too slow\n”);
exit(2);
}

alarm(10);
n = read(0, line, MAXLINE);

}

Signal handling within a signal handler

  • One more problem remains with the above scenario.
  • If the program has several signal handlers, and the program is inside one of the signal handlers, when the alarm goes off,
    • The longjmp inside the alarm handler will jump to the setjmp location.
    • This may abort the other handler, or corrupt its data
  • To avoid this, sigsetjmp and siglongjmp need to be used, which will work correctly with signals also.

Restartability within a slow system call

  • As mentioned before, we may be in the middle of a system call such as the read, when this signal handler triggers.
  • This system call then has to be restarted.
  • To allow this, the handler should be registered with the SA_RESTART flag

The modified code

void sig_alrm(int signo)
/* interrupt the read() and jump to
setjmp() call with value 1
*/
{
siglongjmp(env_alrm, 1);
}

void catch_int(int signo) {

}

sigfunc *signal( int signo, Sigfunc *func )
{

struct sigaction act, oact;

act.sa_handler = func;
sigemptyset( &act.sa_mask );
act.sa_flags = 0;

act.sa_flags |= SA_INTERRUPT;

// Fill the full set of signals in the mask

sigfillset(&(act.sa_mask));

if( signo != SIGALRM )

act.sa_flags |= SA_RESTART;

/* any system call interrupted by a signal

* other than alarm is restarted */

if( sigaction( signo, &act, &oact) < 0 )
return(SIG_ERR);
return( oact.sa_handler );
}

int main(int argc, char *argv[] )

{

struct sigaction act;

act.sa_handler = catchint;

/* now, SIGINT will cause catchint to be executed */

sigaction( SIGINT, &act, NULL );

sigaction( SIGQUIT, &act, NULL );

act.sa_handler = sig_alrm;

if( sigaction(SIGALRM, &act, NULL) == SIG_ERR) {
printf(“signal(SIGALRM) error\n”);
exit(1);
}

if( sigsetjmp(env_alrm) != 0 )

{
fprintf(stderr, “\nread() too slow\n”);
exit(2);
}

alarm(10);
n = read(0, line, MAXLINE);

}

Re-entrancy

Some re-entrant functions

  • Sometimes the signal handler is called when the main program is in the middle of a function.
  • The same function could get called inside the signal handler as well.
  • In that case, these functions have to be re-entrant.
  • Not all functions are re-entrant

Some re-entrant functions

  • read(), write(), fork()

Some non-reentrant functions

  • malloc() /* The worst offender */
  • scanf(), printf(), strtok() etc

Setting of errno

  • Some signal handlers may call other functions, which may actually set the value of errno.
  • These must be restored when the error handler exits.
  • Otherwise functions could randomly, see their return values changed, if the signal handler was called.

 

Spin-Locks on Multi-CPU – A short note

MS-Word version of this posting

References

Multi-CPU Environment

The standard implementation of memory in a Multi-CPU environment

  1. CPU is connected through Local Caches, where local variables are kept
  2. The caches are  connected to memory

 Spin-locks

The code

To acquire a lock, the following is the code

  1. A variable “x” is declared, that is shared among the threads (and hence the CPUs )
  1. The following is the code executed for each “Spin-Lock”
int x;while(1) {if ( test_and_set(&x) ) {

// Lock is acquired

}

}

The Steps for Test and Set

The “Test and Set” instruction bounces the Cache line.

To understand why, let us look at the typical operation of a “Test and Set”

  1. Transfers the variable to its local cache in Exclusive (EX) mode
  2. Disables interrupts
  3. Verifies if the variable contains the value 0 = Unlocked
  4. If so, changes the value to 1 = Locked
  5. Enables the interrupts
  6. Changes the cache status to Modified (M) mode or to (S) mode

This involves at least one Cache coherency cycle, and upto two memory accesses (depending on the Cache coherency protocol).

Spin-Locks slow down the system

Note that, in the above,

  • Spin-Lock is executed by every CPU.
  • So, every CPU will try to periodically acquire the variable in Exclusive (E) mode, and there will be continuous Cache coherency messages  on the bus
  • This will slow the lock acquisition and release, as well as the throughput of the general system.

Test and Spin

Modified Code

Suppose we used a “Modified Code” that worked as follows

  1. Verify if the variable is unlocked.
  2. If the variable is unlocked, then acquire the same.
int x;while(1) {if ( x == Unlocked ) {

if ( test_and_set(&x) ) {

// Lock is acquired

}

}

}

The Modified Steps

The Steps in this case are

  1. Transfers the variable to its local cache in (S) mode.

From the second time onwards, the variable is already present in the cache

  1. If the variable is in the “Unlocked” state
    1. Acquire the variable in an Exclusive (EX) mode
    2. Disable the interrupts
    3. Verify if the variable value is still Unlocked
    4. If so, change the value to Locked
    5. Enables interrupts
    6. Change Cache-status to Modified (M)

Test and Spin is a little better

The Test and Spin performs a little better, as

  • There is no Cache contention, while the item is Locked. This is a big saving on traffic
  • The only contention is immediately after a lock is released.
  • But, even this is enough to create problems, if the number of waiting threads is huge.

Array-Lock

The Approach

The “Test and Spin” approach has “Cache Contention”, when the lock is released, and when there are other threads waiting to acquire the Lock.

The following approach will avoid “Cache Contention”

  • Each thread spins on a separate variable.
  • The spinning variables are allocated from a queue (implemented as an array)
  • Whenever a lock is released, it will “Unlock” the variable from the head of the queue.

The Code

The code for the sections is defined as follows

int tail, head;Lock l[NL];int acquire_lock() {

// Increment the tail of the queue atomically

while( !fetch_and_increment(&tail) );

l[tail] = WAIT;

Lock *p = &l[tail];

while( *p != LOCKED );

}

int release_lock() {

// Increment the head of the queue atomically

l[head] = FREE;

head++;

l[head] = LOCKED;

}

MCS Locks

Works similar to the Locks above, except that the Queue is implemented as a Linked List, rather than as a Array.

Paxos Algorithm – The basics (First Cut)

MS-Word notes on the topic

References

http://en.wikipedia.org/wiki/Paxos_%28computer_science%29 A good high-level picture
http://the-paper-trail.org/blog/consensus-protocols-paxos/ Informal reasoning on protocol steps
http://research.microsoft.com/en-us/um/people/lamport/pubs/paxos-simple.pdf A slightly formal paper with the proofs
http://www.scs.stanford.edu/~dm/home/papers/paxos.pdf Potential Algorithm Implementation
http://www.slideshare.net/paolos84/the-paxos-commit-algorithm Some details of use in a Transaction system
http://static.googleusercontent.com/external_content/untrusted_dlcp/ research.google.com/en/us/archive/paxos_made_live.pdf Paper on Chubby system
http://static.usenix.org/event/nsdi11/tech/full_papers/Bolosky.pdf A File System implementation with Paxos

 

Overview

Definition

Paxos is a distributed Consensus algorithm, that

  • Works with a quorum of 2F+1 nodes, and can tolerate F failures

Usage

SAN Nodes Establishing Consensus among the various replica nodesUsed for high-throughput Storage servers
CoordinatorsZookeeper, Chubby Establishes Locks and Configuration Servershttp://www.read.seas.harvard.edu/~kohler/class/08w-dsi/chandra07paxos.pdf
OLTP The basic mechanism to ensure consistency on transaction orderinghttp://www.ist-selfman.org/wiki/images/0/0e/ZIBpaperOnPaxos.pdf
Network Multicast High throughput Atomic Broadcasthttp://www.cs.cornell.edu/~ns672/publications/2010DSN.pdf

Paxos Structure

Roles

There are five original roles in PAXOS, that can be played by different nodes

Client End-user who makes the coordination (or write) request
Proposer One of the nodes, that coordinates a request on behalf of a client
Acceptors A set of nodes that must produce a consensus before the request is accepted
Learners The set of nodes that learn of the value of a Distributed Consensus,so that they can execute some operation based on it.

 

In reality, these roles are “Logical Roles”. A node may play “Multiple Role les”.

  • In many implementations, each node play three roles of “Proposer”, “Acceptor” and “Learner”.

Quorum

A Quorum has its usual definition

  • A subset of Acceptors, such that any two subsets share at least one member
  • For 2F+1 nodes, any quorum consists of a majority of nodes – (F+1) nodes

Basic Paxos Algorithm

Goal

The following is a high-level working of the PAXOS algorithm

  1. Multiple clients may send values to one proposer each.
  2. Multiple proposers will each propose a value
    • The value could typically be a version number, or an operation
  3. After a round of message exchange, all Acceptors must accept a Single value
  4. The single Accepted value is sent to all the Learners
  5. The computation is executed at the learners
  6. The proposers who lost their proposals can retry the consensus algorithm.

Single Proposer Multiple Acceptors

Under this scenario, a single Proposer, proposes to multiple Acceptors

  1. The Proposer sends a message PROPOSE(N,v) to the acceptors
  2. The Acceptor, on receiving the message
    1. If the Acceptors have not Promised their votes to a proposal numbered higher than N, they send back a PROMISE(N,v) message
    2. If not, they send back a Reject message
  3. The Proposer on receiving the PROMISE message from the Acceptors,
    1. If the majority of Acceptors send the message back, the Proposer goes onto the next step of sending PROPOSE(N,v) to the quorum
    2. Otherwise the proposal is abandoned, and the Proposer starts the process again.
  4. The Acceptors on receiving the ACCEPT message from the Proposers,
    1. If the Acceptor has not Promised a higher value of N, it sends an ACCEPT message to the listener / coordinator
    2. Else it sends a Reject message
  5. The Proposer, on receiving the ACCEPT message,
    1. Sends an INFORM message to the Client
    2. Sends an INFORM message to the Learners

Accounting for Stability

The above protocol has a flaw, in that,

If another Proposer comes in after Stable state is achieved, and it so happens to have a higher Round number,

then, all the Acceptors, as part of Step 2a above, would change their value to the new one, since it has a new Round number.

To avoid this, we need to make sure that, if a system chooses a value, then the value will not change subsequently.

Choosing a value means the majority of (quorum of) the Acceptors Accept a value, then the value is considered chosen.

The value that was propagated by the highest round accepted so far, is a “Chosen Value”.

Modified Stability Algorithm

The following algorithm steps

  1. The proposer sends to a quorum of Acceptors, a proposal – PREPARE(N,V)
    • Where N is a random sequence number, and V is a value
  2. Each Acceptor does the following
    • If the Acceptor has received a higher numbered PREPARE request,
      • then the Acceptor rejects the proposal.
    • Else, it responds with PROMISE(M,v),
      • which is a Promise not to accept any proposal numbered less than N
      • and returns M – the highest round #, that has been accepted by the node, and its value v
  3. When the Proposer receives enough messages (required for a quorum),
    • it issues a PROPOSE(N,v), where
      • N is the round number of this proposer
      • v is the value corresponding to the highest value of M, received in Step-2a
  4. When each Acceptor receives the value – PROPOSE(N,v)
    • If the Acceptor has already accepted a larger Proposal N,
      • It sends a Reject to the coordinator
    • Else, it sends a Accept message

Leader Election in Distributed Systems – First cut

References

Overview

Starting Leader elections

  1. Every process periodically communicates with its coordinator to ensure the coordinator is alive.
  2. When the coordinator does not respond, the node starts a Leader Election
  3. Note that multiple nodes could detect this, and start the Leader Election
  4. It may also be initiated when a node joins the group
  5. Leader Election algorithms exchange multiple rounds of messages
  6. If there are no more failures – that is, the system state is stable,

Ending Leader Elections

  • then the leaders are stable, and there is only one leader per connected group of nodes
  1. Under stable state, the system may partition, and each partition may have a unique leader.
  2. Once there is more instability in the system, leader election starts again.

Bully Algorithm

Applicability

Bully Algorithm applies under two situations

  1. Each node is able to compute the winner of a comparison using the same node invariant – Eg nodeid
  2. There are only server failures in the system, and no communication failures.
  • These failures can be detected using Timeouts

The Algorithm

  1. Each node in the cluster, when it detects that its leader is down does the following
  2. Send a broadcast <ELECT,Id> message to each of the nodes
  3. Each node, when it receives the message,
    1.  Responds with an <OK,Id> message
    2. Starts the Leader Election step, if it has not already done so.
  4. The sender of the message in Step (2), when it receives a reply will know whether it is a leader
    1. If it has the lowest Id, then it broadcasts a <LEADER,Id> message to each of the cohorts
    2. If it does not have the lowest Id, then it withdraws from the election silently.

(If there are no more failures, the node with the lower-id wins the election)

An example of Bully Algorithm

Invite Algorithm

Overview

The Invite Algorithm is an enhancement to the “Bully Algorithm” in the presence of communication failures. Because of this, communication nodes may get partitioned, and may have to rejoin.

The Algorithm

The algorithm has four components

Find Coordinators

As part of this operation,

  1. Each node sends a CHECK message
  2. On receiving this message, every other node responds with <id,is_coord>

Trigger Election

If the node is not a coordinator, and there are no other coordinators

  1. The node promotes itself to a coordinator, and executes INVITE(neighbors)

Merge Groups

If the node is already a coordinator, and there is other coordinators discovered

  1. The node executes INVITE(coordinators)

Invite at non Coordinators

At the non coordinator nodes,

  1. The node verifies whether the current node can join the specified group

(The invariants of group leadership need to be obeyed – leader has a lower Id )

  1. If so, the node responds with an ACCEPT(id)

Invite at Coordinators

At the coordinator nodes,

  1. The coordinator verifies if it can join the existing nodes
  2. If so, the node sends a message to its group, that their group membership should change

Tag Cloud