Zookeeper(四)Spring整合Zookeeper【监听器篇】

一、引言

  我们在前面的学习中知道了zk主要是由文件系统数据结构+监听通知机制组成的,上一篇已经介绍了对zk进行的基础操作,这一篇我想写一下Curator是如何帮我们实现监听功能的。

二、watch的传统方式实现【标准观察模式】

  现在我们先看看Curator对传统方式watch是如何进行实现的:

  弊端:由于watch机制触发一次就会失效,这种方式不能直接实现循环监听,需要从代码层面做成循环监听的模式,不是很推荐,可以当做了解:

/**
 * Curator客户端实现watch监听测试类
 * PS:利用Watcher来对节点进行监听操作,但此监听操作只能监听一次,与原生API并无太大差异。【了解即可,一般情况不推荐使用】
 *
 * @author 有梦想的肥宅
 * @date 2021/08/18
 */
public class CuratorWatchDemo {

    public static void main(String[] args) throws Exception {
        //1、定义需要监听的节点
        String path = "/watchDemo";

        //2、获取Curator客户端并设置监听事件
        CuratorFramework curatorFramework = ZookeeperClientUtil.getCuratorFramework();
        curatorFramework.getData().usingWatcher(new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //PS:当节点发生变动的时候会打印这段话【有梦想的肥宅】
                System.out.println("【有梦想的肥宅】监听器watchedEvent:" + watchedEvent);
            }
        }).forPath(path);

        //3、模拟第一次修改被监听的节点【watch生效】
        curatorFramework.setData().forPath(path, "【有梦想的肥宅】第1次被监听啦~".getBytes());

        //4、模拟第二次修改被监听的节点【watch已失效】
        curatorFramework.setData().forPath(path, "【有梦想的肥宅】第2次被监听啦~".getBytes());

        //5、关闭客户端连接
        curatorFramework.close();
    }

}

三、基于cache实现监听的实现【缓存监听模式】

  定义:一种基于本地缓存视图的Cache机制并与远程Zookeeper视图的对比过程,来实现对Zookeeper服务端事件监听。

  分类:

  • Path Cache【子节点监听器】
  • Node Cache【当前节点监听器】
  • Tree Cache【综合监听器】

Path Cache

  观察ZNode的子节点并缓存状态,如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。

Node Cache

  观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。 

Tree Cache

  可以看做是上两种的合体,Tree Cache观察的是ZNode及子节点。

代码演示

/**
 * Curator客户端实现对缓存设置监听器的测试类
 * PS:Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能【反复监听】。
 *
 * @author 有梦想的肥宅
 * @date 2021/08/18
 */
public class CuratorCacheListenerDemo {

    public static void main(String[] args) throws Exception {

        String cachePath = "/cacheListenerDemo";
        //1、创建连接
        CuratorFramework curatorFramework = ZookeeperClientUtil.getCuratorFramework();

        //2、创建cachePath
        curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(cachePath, "this cache listener".getBytes());

        //3、创建PathChildrenCache【路径监听,用于观察其下子节点变更】【cache有3种:Path Cache、Node Cache、Tree Cache这里只演示其中一种】
        PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, cachePath, true);//true代表缓存数据到本地
        pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);//BUILD_INITIAL_CACHE 代表使用同步的方式进行缓存初始化。
        pathChildrenCache.getListenable().addListener((cf, event) -> {
            //4、获取到事件信息,并根据事件类型输出
            PathChildrenCacheEvent.Type eventType = event.getType();
            switch (eventType) {
                case CONNECTION_RECONNECTED:
                    pathChildrenCache.rebuild();
                    System.out.println("【有梦想的肥宅】重新建立连接!");
                    break;
                case CONNECTION_SUSPENDED:
                    System.out.println("【有梦想的肥宅】暂停/阻塞连接!");
                    break;
                case CONNECTION_LOST:
                    System.out.println("【有梦想的肥宅】连接断开!");
                    break;
                case CHILD_ADDED:
                    System.out.println("【有梦想的肥宅】添加子节点!");
                    break;
                case CHILD_UPDATED:
                    System.out.println("【有梦想的肥宅】更新子节点!");
                    break;
                case CHILD_REMOVED:
                    System.out.println("【有梦想的肥宅】删除子节点!");
                    break;
                default:
            }
        });

        //5、这里让线程沉睡以等待listener被调用,此时可以直接操作服务器上的节点,在控制台就会打印触发对应事件的内容了
        Thread.sleep(60 * 60 * 1000);

        //6、最后关闭路径监听
        pathChildrenCache.close();
    }

}
原文地址:https://www.cnblogs.com/riches/p/15157324.html