zookeeper使用问题合集

CONNECTIONLOSS(连接断开)和SESSIONEXPIRED(Session过期) 【原文

连接断开(Connection Loss)
  连接断开(CONNECTIONLOSS)一般发生在网络的闪断或是客户端所连接的服务器挂机的时候。这种情况下,ZooKeeper客户端首先会捕获“连接断开”异常 ——> 获取一个新的ZooKeeper地址 ——> 尝试连接
  整个过程依赖于ZooKeeper客户端自己进行并且使用同一个会话ID,因此发生CONNECTION LOSS时,应用不需要处理,等待ZooKeeper客户端建立新的连接即可。

会话超时(Session Expired)
  Session Expired发生在ZooKeeper客户端与服务器的连接断了,试图连接上新的ZooKeeper机器,但由于耗时过长,超过了SESSION_TIMEOUT 后还没有成功连接上服务器,那么服务器认为这个Session已经结束了(服务器无法确认是因为其它异常原因还是客户端主动结束会话)。由于在ZooKeeper中,很多数据和状态都是和会话绑定的,一旦会话失效,那么ZooKeeper就开始清除和这个会话有关的信息,包括这个会话创建的临时节点和注册的所有Watcher。在这之后,由于网络恢复后,客户端可能会重新连接上服务器,但是服务器会告诉客户端一个异常:Session Expired(会话过期)。此时客户端的状态变成 CLOSED状态,要重新实例ZooKeeper对象,重新操作所有临时数据(包括临时节点和注册Watcher)。
  因此一旦发生Session Expired,存储在ZooKeeper上的所有临时数据与注册的订阅者都会被移除,此时需要重新创建一个ZooKeeper客户端实例,需要自己编码做一些额外的处理。

zookeeper实时监听节点变化 【参考

  public class CuratorPCWatcher {

private static final String zkServerIps = "127.0.0.1:2181";

public static void main(String[] args) throws Exception {
    final String nodePath = "/brokers";
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 5);
    CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkServerIps)
            .sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
    client.start();
    try {
        // 为子节点添加watcher,PathChildrenCache: 监听数据节点的增删改,可以设置触发的事件
        final PathChildrenCache childrenCache = new PathChildrenCache(client, nodePath, true);

        /**
         * StartMode: 初始化方式
         *  - POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
         *  - NORMAL:异步初始化
         *  - BUILD_INITIAL_CACHE:同步初始化
         */
        childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

        // 列出子节点数据列表,需要使用BUILD_INITIAL_CACHE同步初始化模式才能获得,异步是获取不到的
        List<ChildData> childDataList = childrenCache.getCurrentData();
        System.out.println("当前节点的子节点详细数据列表:");
        for (ChildData childData : childDataList) {
            System.out.println("	* 子节点路径:" + childData.getPath() + ",该节点的数据为:" + Arrays.toString(childData.getData()));
        }

        // 添加事件监听器
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                // 通过判断event type的方式来实现不同事件的触发
                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {  // 子节点初始化时触发
                    System.out.println("子节点初始化成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {  // 添加子节点时触发
                    System.out.print("子节点:" + event.getData().getPath() + " 添加成功,");
                    System.out.println("该子节点的数据为:" + new String(event.getData().getData()));
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {  // 删除子节点时触发
                    System.out.println("子节点:" + event.getData().getPath() + " 删除成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {  // 修改子节点数据时触发
                    System.out.print("子节点:" + event.getData().getPath() + " 数据更新成功,");
                    System.out.println("子节点:" + event.getData().getPath() + " 新的数据为:" + new String(event.getData().getData()));
                }
            }
        });
        Thread.sleep(100000); // sleep 100秒,在 zkCli.sh 操作子节点,注意查看控制台的输出
    } finally {
        client.close();
    }
 }

需要新加以下jar包

  <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.2.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.2.0</version>
    </dependency>`

注意:方法需要一直运行,然后变化的时候,可以收到相应数据

原文地址:https://www.cnblogs.com/cuiyf/p/13615183.html