zookeeper应用

    一端不停的更新配置,另一端监听这个配置的变化。

    需要注意的是:监听端不一定读取到所有的变化。在zk服务器发送通知到客户端,客户端读取数据注册监听之间可能发生了多次数据变化,这些数据变化是得不到通知的。但可以保证的是每次通知得到的数据都是比之前的数据要新的。
 
ZKUtils.java
package config;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class ZKUtils {
	/**
	 * 构建zookeeper客户端对象
	 * @param hosts
	 * @return
	 * @throws Exception
	 */
	public static ZooKeeper open(String hosts) throws Exception {
		final CountDownLatch singal = new CountDownLatch(1);
		
		ZooKeeper zk = new ZooKeeper(hosts, 2000, new Watcher() {
			@Override
			public void process(WatchedEvent event) {
				singal.countDown();
			}
		});
		
		singal.await();
		
		return zk;
	}
}

  

ConfigUpdater.java

package config;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
 * 不停的更新/test节点上的数据, 模拟配置更新 
 * 
 */
public class ConfigUpdater {
	public static final String HOSTS = "hadoop1:2181";
	public static final String PATH = "/test";
	
	public static void main(String[] args) throws Exception {
		ZooKeeper zk = ZKUtils.open(HOSTS);
		
		while(true) {
			String data = UUID.randomUUID().toString();
			
			Stat stat = zk.exists(PATH, false);
			if(stat == null) {
				zk.create(PATH, data.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
			} else {
				zk.setData(PATH, data.getBytes("UTF-8"), -1);
			}
			
			TimeUnit.SECONDS.sleep(5);
		}
	}
}

  

ConfigUpdateWatcher .java

package config;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;
/**
 * 注册监听/test节点上的数据变化
 *
 */
public class ConfigUpdateWatcher implements Watcher {
	private ZooKeeper zk = null;
	
	public ConfigUpdateWatcher() {
		try {
			zk = ZKUtils.open(ConfigUpdater.HOSTS);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public void process(WatchedEvent event) {
		System.out.println(event);
		
		if(event.getType().equals(EventType.NodeDataChanged)) {
			try {
				//读取事件后, 再次注册数据监听事件
				byte[] data = zk.getData(ConfigUpdater.PATH, this, null);
				System.out.printf("接收到了事件%s, 新的数据是:%s", EventType.NodeDataChanged, new String(data, "UTF-8"));
				System.out.println();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
	
	private void run() {
		try {
			//注册数据变化监听
			zk.getData(ConfigUpdater.PATH, this, null);
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		try {
			TimeUnit.SECONDS.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		new ConfigUpdateWatcher().run();
	}
}

  

完善

在操作zookeeper的时候,如果是幂等操作(多次操作不影响结果),在失败时可以多次重试以增加可靠性,比如ConfigUpdater的写操作可以进行多次重试,修改后的代码如下:
package config;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
 * 不停的更新/test节点上的数据, 模拟配置更新 
 * 
 */
public class ConfigUpdater {
	public static final String HOSTS = "hadoop1:2181";
	public static final String PATH = "/test";
	public static final int RETRIES = 3; //重试次数
	public static final int RETRY_PERIOD = 500; //重试间隔
	
	public static void main(String[] args) throws Exception {
		ZooKeeper zk = ZKUtils.open(HOSTS);
		
		//不停的模拟数据更新操作
		while(true) {
			
			String data = UUID.randomUUID().toString();
			
			/*
			 * 多次重试,增加可靠性 
			 */
			int retied = 0;
			while(true) {
				try {
					Stat stat = zk.exists(PATH, false);
					if(stat == null) {
						zk.create(PATH, data.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
					} else {
						zk.setData(PATH, data.getBytes("UTF-8"), -1);
					}
					
					//执行成功则跳出循环,否则继续(重试)
					break;
				} catch (KeeperException e) {
					retied++;
					
					//如果会话过期了则重新创建一个ZooKeeper客户端对象
					if(e.code().equals(KeeperException.Code.SESSIONEXPIRED)) {
						zk = ZKUtils.open(HOSTS);
					} else {
						//其他KeeperException Code的处理
						
						//KeeperException.Code.CONNECTIONLOSS异常可以不用处理:ZooKeeper客户端对象会自动进行重新连接
					}
					
					//可以重试3次,每次间隔500毫秒
					if(retied == RETRIES) {
						throw e;
					} else {
						TimeUnit.MICROSECONDS.sleep(RETRY_PERIOD);
					}
				}
			}
			
			//模拟其他操作占用的时间
			TimeUnit.SECONDS.sleep(5);
		}
	}
}

  

原文地址:https://www.cnblogs.com/lishouguang/p/4558987.html