zookeeper的API应用

            zookeeper的API应用

                               作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

 

 

 

 

一.环境准备

1>.创建Maven工程

  使用IDE工具创建一个Maven工程,关于IDE看您个人喜好,可以使用Ecllipse或者Idea均可。

2>.在pom.xml文件中添加依赖关系

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.org.yinzhengjie</groupId>
    <artifactId>zookeeper</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.1</version>
        </dependency>


    </dependencies>


</project>
pom.xml文件内容

3>.创建log4j.properties文件

log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”

二.获取子节点并监听节点变化

1>.使用默认的回调函数

package cn.org.yinzhengjie;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class ZkClient {

    //声明一个ZooKeeper的客户端对象
    private ZooKeeper zkCli;

    //定义zookeeper集群的连接地址,注意域名解析哟~
    private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181";

    //定义与zookeeper集群会话的超时时间,单位为毫秒
    private static final int SESSION_TIMEOUT = 3000;

    @Before
    public void before() throws IOException {
        zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> {
                System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath());
        });
    }


    @Test
    public void ls() throws KeeperException, InterruptedException {
        //查看根znode的子节点信息,并观察该事件
        List<String> children = zkCli.getChildren("/", true);

        System.out.println("========== 开始遍历查询结果 ==========");
        for (String child:children){
            System.out.println(child);
        }
        System.out.println("========== 结束遍历查询结果 ==========");

        /**
         *  让当前线程延时阻塞(zookeeper客户端和服务端是异步通信的),目的是当服务端根下的znode发生变化时会及时调用回调函数.
         */
        Thread.sleep(Long.MAX_VALUE);
    }

}
案例代码

2>.使用自定义的回调函数

package cn.org.yinzhengjie;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class ZkClient {

    //声明一个ZooKeeper的客户端对象
    private ZooKeeper zkCli;

    //定义zookeeper集群的连接地址,注意域名解析哟~
    private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181";

    //定义与zookeeper集群会话的超时时间,单位为毫秒
    private static final int SESSION_TIMEOUT = 3000;

    @Before
    public void before() throws IOException {
        zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> {
                System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath());
        });
    }


    @Test
    public void ls() throws KeeperException, InterruptedException {
        //查看根znode的子节点信息,并观察该事件
        List<String> children = zkCli.getChildren("/", event ->{
            System.out.println(event.getType() + "***** 自定义的回调函数 *****" + event.getPath());
        });

        System.out.println("========== 开始遍历查询结果 ==========");
        for (String child:children){
            System.out.println(child);
        }
        System.out.println("========== 结束遍历查询结果 ==========");

        /**
         *  让当前线程延时阻塞(zookeeper客户端和服务端是异步通信的),目的是当服务端根下的znode发生变化时会及时调用回调函数.
         */
        Thread.sleep(Long.MAX_VALUE);
    }

}
案例代码

三.创建子节点

package cn.org.yinzhengjie;

import org.apache.zookeeper.*;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class ZkClient {

    //声明一个ZooKeeper的客户端对象
    private ZooKeeper zkCli;

    //定义zookeeper集群的连接地址,注意域名解析哟~
    private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181";

    //定义与zookeeper集群会话的超时时间,单位为毫秒
    private static final int SESSION_TIMEOUT = 3000;

    @Before
    public void before() throws IOException {
        zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> {
                System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath());
        });
    }


    @Test
    public void create() throws KeeperException, InterruptedException {
        /**
         *  create的方法前面如下:
         *      public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
         *
         *  以下是对各参数的解释说明:
         *      path:
         *          指定要创建的znode在zookeeper的路径.
         *      data:
         *          指定创建的znode需要保存的原始数据.
         *      acl:
         *          指定创建znode的ACL,我们指定的"ZooDefs.Ids.OPEN_ACL_UNSAFE"您可以理解为权限最大的ACL.
         *      createMode:
         *          指定创建znode的类型.我们指定的"CreateMode.EPHEMERAL"您可以理解为创建临时znode。
         */
        String znode = zkCli.create("/yinzhengjie2020", "yinzhengjie.com".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        System.out.println(znode);

        /**
         *  让当前线程延时阻塞(zookeeper客户端和服务端是异步通信的)
         */
        Thread.sleep(Long.MAX_VALUE);

    }
}
案例代码

四.查询znode存储的数据

package cn.org.yinzhengjie;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class ZkClient {

    //声明一个ZooKeeper的客户端对象
    private ZooKeeper zkCli;

    //定义zookeeper集群的连接地址,注意域名解析哟~
    private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181";

    //定义与zookeeper集群会话的超时时间,单位为毫秒
    private static final int SESSION_TIMEOUT = 3000;

    @Before
    public void before() throws IOException {
        zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> {
                System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath());
        });
    }


    @Test
    public void get() throws InterruptedException, KeeperException {


        /**
         *  getData方法的签名如下:
         *      public byte[] getData(String path, boolean watch, Stat stat)
         *  以下是对其参数的相关说明:
         *      path:
         *          指定要获取数据的znode路径
         *      watch:
         *          是否需要监视此节点.
         *      stat:
         *          传递一个Stat对象即可.
         */
        byte[] data = zkCli.getData("/yinzhengjie2020", true, new Stat());

        //将获取的数据由字节数组转换成字符串
        String getData = new String(data);

        //打印获取到的数据
        System.out.println(getData);

        /**
         *  让当前线程延时阻塞(zookeeper客户端和服务端是异步通信的)
         */
        Thread.sleep(Long.MAX_VALUE);

    }
}
案例代码

五.修改znode存储的数据

package cn.org.yinzhengjie;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class ZkClient {

    //声明一个ZooKeeper的客户端对象
    private ZooKeeper zkCli;

    //定义zookeeper集群的连接地址,注意域名解析哟~
    private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181";

    //定义与zookeeper集群会话的超时时间,单位为毫秒
    private static final int SESSION_TIMEOUT = 3000;

    @Before
    public void before() throws IOException {
        zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> {
                System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath());
        });
    }


    @Test
    public void set() throws InterruptedException, KeeperException {

        /**
         *  setData方法的签名如下:
         *      public Stat setData(final String path, byte[] data, int version)
         *  以下是对其参数的相关说明:
         *      path:
         *          指定要修改数据的znode路径.
         *      data:
         *          指定要修改的数据内容.
         *      version:
         *          指定版本号.这个版本号要和zookeeper集群保存该znode的dataVersion值保持一致哟~
         */
        Stat stat = zkCli.setData("/yinzhengjie2020", "bigdata".getBytes(), 1);

        //打印数据长度,很明显"bigdata"的长度为"7".
        System.out.println(stat.getDataLength());

    }
}
案例代码

六.判断znode是否存在

package cn.org.yinzhengjie;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class ZkClient {

    //声明一个ZooKeeper的客户端对象
    private ZooKeeper zkCli;

    //定义zookeeper集群的连接地址,注意域名解析哟~
    private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181";

    //定义与zookeeper集群会话的超时时间,单位为毫秒
    private static final int SESSION_TIMEOUT = 3000;

    @Before
    public void before() throws IOException {
        zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> {
                System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath());
        });
    }


    @Test
    public void exist() throws InterruptedException, KeeperException {

        /**
         *  exists方法的签名如下:
         *      public Stat exists(String path, boolean watch)
         *  以下是对其参数的相关说明:
         *      path:
         *          指定zookeeper中znode的路径.
         *      watch:
         *          是否需要监视此节点.
         */
        Stat znode = zkCli.exists("/yinzhengjie2019", false);

        //如果znode存在就打印其数据长度,不存在就提示用户.
        if (znode == null){
            System.out.println("该znode不存在!");
        }else{
            System.out.println(znode.getDataLength());
        }

    }
}
案例代码

七.删除znode

package cn.org.yinzhengjie;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class ZkClient {

    //声明一个ZooKeeper的客户端对象
    private ZooKeeper zkCli;

    //定义zookeeper集群的连接地址,注意域名解析哟~
    private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181";

    //定义与zookeeper集群会话的超时时间,单位为毫秒
    private static final int SESSION_TIMEOUT = 3000;

    @Before
    public void before() throws IOException {
        zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> {
                System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath());
        });
    }


    @Test
    public void delete() throws InterruptedException, KeeperException {

        String znodePath = "/yinzhengjie2020";

        Stat znode = zkCli.exists(znodePath, false);

        if (znode != null){
            /**
             *  delete方法的签名如下:
             *      public void delete(final String path, int version)
             *  以下是对其参数的相关说明:
             *      path:
             *          指定zookeeper中znode的路径.
             *      version:
             *          指定znode的版本号(该版本号要和zookeeper集群保存该znode的dataVersion值保持一致哟).
             *
             *   温馨提示:
             *      如果您要删除的znode下有子znod是无法直接删除的,需要先删除子znode.
             */
            zkCli.delete(znodePath,znode.getVersion());
        }
    }
}
案例代码

 八.循环监听(watch)案例

package cn.org.yinzhengjie;

import org.apache.zookeeper.*;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;

public class ZkClient {

    //声明一个ZooKeeper的客户端对象
    private ZooKeeper zkCli;

    //定义zookeeper集群的连接地址,注意域名解析哟~
    private static final String CONNECT_ADRESS = "hadoop101.yinzhengjie.org.cn:2181,hadoop102.yinzhengjie.org.cn:2181,hadoop103.yinzhengjie.org.cn:2181";

    //定义与zookeeper集群会话的超时时间,单位为毫秒
    private static final int SESSION_TIMEOUT = 3000;

    @Before
    public void before() throws IOException {
        zkCli = new ZooKeeper(CONNECT_ADRESS, SESSION_TIMEOUT, watchedEvent -> {
                System.out.println(watchedEvent.getType() + "***** 默认回调函数 *****" + watchedEvent.getPath());
        });
    }



    @Test
    public void register()  throws KeeperException,InterruptedException  {

        byte[] data = zkCli.getData("/yinzhengjie2020", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    register();
                }catch (KeeperException e){
                    e.printStackTrace();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }, null);

        //打印获取到的数据
        System.out.println(new String(data));
    }

    @Test
    public void testRegister(){
        try {
            register();
            Thread.sleep(Long.MAX_VALUE);
        }catch (KeeperException e){
            e.printStackTrace();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}
案例代码

原文地址:https://www.cnblogs.com/yinzhengjie2020/p/12913216.html