You don't need to use the Zab protocol. Instead you may follow the below steps:
You have a Znode say /bigvalue on Zookeeper. All the mappers when starts reads the value stored in it. They also put an watch for data change on the Znode. Whenever a mapper gets a better value, it updates the Znode with the better value. All the mappers will get notification for the data change event and they read the new best value and they re-establish the watch for data changes again. That way they are in sync with the latest best value and may update the latest best value whenever there is a better value.
Actually zkclient is a very good library to work with Zookeeper and it hides a lot of complexities ( https://github.com/sgroschupf/zkclient ). Below is an example that demonstrates how you may watch a Znode "/bigvalue" for any data change.
package geet.org;
import java.io.UnsupportedEncodingException;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.data.Stat;
public class ZkExample implements IZkDataListener, ZkSerializer {
public static void main(String[] args) {
String znode = "/bigvalue";
ZkExample ins = new ZkExample();
ZkClient cl = new ZkClient("127.0.0.1", 30000, 30000,
ins);
try {
cl.createPersistent(znode);
} catch (ZkNodeExistsException e) {
System.out.println(e.getMessage());
}
// Change the data for fun
Stat stat = new Stat();
String data = cl.readData(znode, stat);
System.out.println("Current data " + data + "version = " + stat.getVersion());
cl.writeData(znode, "My new data ", stat.getVersion());
cl.subscribeDataChanges(znode, ins);
try {
Thread.sleep(36000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("Detected data change");
System.out.println("New data for " + dataPath + " " + (String)data);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("Data deleted " + dataPath);
}
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
if (data instanceof String){
try {
return ((String) data).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return null;
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
}