Question

Good morning,

I am new to ZooKeeper and its protocols and I am interested in its broadcast protocol Zab.

Could you provide me with a simple java code that uses the Zab protocol of Zookeeper? I have been searching about that but I did not succeed to find a code that shows how can I use Zab.

In fact what I need is simple, I have a MapReduce code and I want all the mappers to update a variable (let's say X) whenever they succeed to find a better value of X (i.e. a bigger value). In this case, the leader has to compare the old value and the new value and then to broadcast the actual best value to all mappers. How can I do such a thing in Java?

Thanks in advance, Regards

Was it helpful?

Solution

You don't need to use the Zab protocol. Instead you may follow the below steps:

You have a Znode say /bigvalue on Zookeeper. All the mappers when starts reads the value stored in it. They also put an watch for data change on the Znode. Whenever a mapper gets a better value, it updates the Znode with the better value. All the mappers will get notification for the data change event and they read the new best value and they re-establish the watch for data changes again. That way they are in sync with the latest best value and may update the latest best value whenever there is a better value.

Actually zkclient is a very good library to work with Zookeeper and it hides a lot of complexities ( https://github.com/sgroschupf/zkclient ). Below is an example that demonstrates how you may watch a Znode "/bigvalue" for any data change.

package geet.org;

import java.io.UnsupportedEncodingException;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.data.Stat;

public class ZkExample implements IZkDataListener, ZkSerializer {
    public static void main(String[] args) {
        String znode = "/bigvalue";
        ZkExample ins = new ZkExample();
        ZkClient cl = new ZkClient("127.0.0.1", 30000, 30000,
                ins);
        try {
            cl.createPersistent(znode);
        } catch (ZkNodeExistsException e) {
            System.out.println(e.getMessage());
        }
        // Change the data for fun
        Stat stat = new Stat();
        String data =  cl.readData(znode, stat);
        System.out.println("Current data " + data + "version = " + stat.getVersion());
        cl.writeData(znode, "My new data ", stat.getVersion());

        cl.subscribeDataChanges(znode, ins);
        try {
            Thread.sleep(36000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void handleDataChange(String dataPath, Object data) throws Exception {
        System.out.println("Detected data change");
        System.out.println("New data for " + dataPath + " " + (String)data);
    }

    @Override
    public void handleDataDeleted(String dataPath) throws Exception {
        System.out.println("Data deleted " + dataPath);
    }

    @Override
    public byte[] serialize(Object data) throws ZkMarshallingError {
        if (data instanceof String){
            try {
                return ((String) data).getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        return null;
    }

    @Override
    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        try {
            return new String(bytes, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top