Zookeeper 系列(三)Zookeeper API

Zookeeper 系列(三)Zookeeper API

本节首先介绍 Zookeeper 的 Shell 命令,再对 Java 操作 Zookeeper 的三种方式进行讲解,本节先介绍 Zookeeper 的原生 API。

  • Zookeeper API:Zookeeper 原生 api
  • ZKClient API
  • Curator API

一、Shell 命令

启动 Zookeeper 服务之后,输入以下命令,连接到 Zookeeper 服务:

zkCli.sh -server localhost:2181

注意: window 下直接 zkCli 启动,不要带参数,否则会报错,详见zookeeper启动抛出NumberFormatException 异常

连接成功之后,输入 help 之后,屏幕会输出可用的 Zookeeper 命令,如下所示:

ZooKeeper -server host:port cmd args
        stat path [watch]
        set path data [version]
        ls path [watch]
        delquota [-n|-b] path
        ls2 path [watch]
        setAcl path acl
        setquota -n|-b val path
        history
        redo cmdno
        printwatches on|off
        delete path [version]
        sync path
        listquota path
        rmr path
        get path [watch]
        create [-s] [-e] path data acl
        addauth scheme auth
        quit
        getAcl path
        close
        connect host:port

(1) 查询 ls

[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 1] ls /zookeeper
[quota]

(2) 创建新的 Znode 节点 create

[zk: localhost:2181(CONNECTED) 7] create /date 2018-04-05
Created /date
[zk: localhost:2181(CONNECTED) 8] ls /
[date, zookeeper]

(3) 获取 Znode 节点 get

[zk: localhost:2181(CONNECTED) 9] get /date
2018-04-05
cZxid = 0x7
ctime = Thu Apr 05 14:21:20 CST 2018
mZxid = 0x7
mtime = Thu Apr 05 14:21:20 CST 2018
pZxid = 0x7
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 10
numChildren = 0

(4) 修改 Znode 节点 set

set /date 2018-04-06

(4) 删除 Znode 节点 delete/rmr

delete /date    # 不能删除非空目录
rmr /date       # 递归删除

二、Zookeeper API

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.0-alpha</version>
</dependency>

示例:

package com.github.binarylei.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @author: leigang
 * @version: 2018-04-05
 */
public class ZookeeperBase {

    /** zookeeper 地址,多个用 , 隔开 */
    static final String CONNECT_ADDR = "127.0.0.1";
    /** session 超时时间,单位:ms */
    static final int SESSION_OUTTIME = 5000;
    /** 阻塞程序执行,用于等待zookeeper 连接成功,发送成功信号 */
    static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
    
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zooKeeper = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher() { // (1)
            @Override
            public void process(WatchedEvent event) {
                // 获取事件的状态
                Event.KeeperState state = event.getState();
                Event.EventType type = event.getType();
                // 如果是建立连接
                if (state == Event.KeeperState.SyncConnected) {
                    if (type == Event.EventType.None) {
                        // 如果连接建立成功,则发送信号,让后续阻塞程序向下执行
                        connectedSemaphore.countDown();
                        System.out.println("zookeeper 连接建立");
                    }
                }

            }
        });

        // 进行阻塞,等待与 Zookeeper 的连接建立完成
        connectedSemaphore.await();
        System.out.println("============================");

        //1. 创建节点
        zooKeeper.create( // (2)
                "/testRoot",            // 节点路径,不允许递归创建节点
                "testRoot".getBytes(),        // 节点内容
                ZooDefs.Ids.OPEN_ACL_UNSAFE,  // 节点权限,一般情况下不用关注
                CreateMode.PERSISTENT);       // 节点类型

        //2. 获取节点
        byte[] data = zooKeeper.getData("/testRoot", false, null);
        System.out.println("获取节点:" + new String(data));

        //3. 获取子节点
        List<String> nodes = zooKeeper.getChildren("/", false);
        for (String node : nodes) {
            System.out.println("获取" + node + "子节点:" +
                    new String(zooKeeper.getData("/" + node, false, null)));
        }

        //4. 修改节点的值
        Stat stat = zooKeeper.setData("/testRoot", "111".getBytes(), -1);

        //5. 判断节点是否存在
        System.out.println("节点是否存在:" + zooKeeper.exists("/testRoot", false));

        //6. 删除节点,不支持递归删除
        zooKeeper.delete("/testRoot", -1, new AsyncCallback.VoidCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx) {
                System.out.println("响应码:" + rc);
                System.out.println("路径:" + path);
                System.out.println("上下文:" + ctx);
            }
        }, 1);

        zooKeeper.close();
    }
}

(1) 创建会话方法 :客户端可以通过创建一个 Zookeeper 实例来连接 Zookeeper 服务器。ZooKeeper 构造方法 有 4 个,参数说明如下:

  1. connectstring :连接服务器列表,已","分割。
  2. sessiontimeout :心跳检测时间周期期(秒)
  3. wather :事件处理通知器。
  4. canbereadonly :标识当前会话是否支持只读。
  5. session 和 sessionpasswd :提供连接 zookeeper 的 session 和密码,通过这俩个确定唯一台客户端,目的是可以提供重复会话。

注意: Zookeeper 客户端和服务器端会话的建立是一个异步的过程 ,程序方法在处理完客户端初始化后立即返回,也就是说程序继续往下执行代码,这样,大多数情况下我们并没有真正构建好一个可用会话,在会话的生命周期处于 "SyncConnected" 时才算真正建立完毕。解决方案是使用 CountDownLatch 阻塞,起到连接建立完成。

(2) 创建节点

create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
  1. path :节点路径,/nodeName。不允许递归创建节点。
  2. data :节点内容,要求类型是字节数组。
  3. acl :节点权限,一般使用 ZooDefs.Ids.OPEN_ACL_UNSAFE 即可。
  4. createMode :节点类型。PERSISTENT(持久节点)、PERSISTENT_SEQUENTIAL(持久顺序节点)、EPHEMERAL(临时节点)、EPHEMERAL_SEQUENTIAL(临时顺序节点)。

(3) 获取节点

getData(String path, boolean watch, Stat stat)

(4) 获取子节点

getChildren(String path, boolean watch)

(5) 修改节点的值

# version = -1 表示全部的历史版本,一般使用 -1 即可
setData(final String path, byte data[], int version)

(6) 判断节点是否存在

exists(String path, boolean watch)

(7) 删除节点

delete(final String path, int version)

(8) 所有的节点都有同步和异步区分,以delete为例

// 参数分别为:路径,版本(-1即可),回调函数,上下文环境
zooKeeper.delete("/testRoot", -1, new AsyncCallback.VoidCallback() {
    @Override
    public void processResult(int rc, String path, Object ctx) {
        System.out.println("响应码:" + rc);
        System.out.println("路径:" + path);
        System.out.println("上下文:" + ctx);
    }
}, 1);

注册一个异步回调函数,要实现 AsyncCallback.VoidCallback 接口,重写 processResult(int rc, String path, Object ctx) 方法,当节点创建完毕后执行此方法。

  1. rc :为服务端响应码 0(调用成功)、-4(端口连接)、-110(指定节点存在)、-112(会话已经过期)。
  2. path :接口调用时传入 API 的数据节点的路径参数
  3. ctx :为调用接口传入 API 的 ctx 值

三、Zookeeper Watcher

3.1 watcher 的概念

Zookeeper Watcher 事件触发机制详见 Watch 触发器。当 watch 监视的数据发生变化时,通知设置了该 watch 的 client,即 watcher。

同样,其 watcher 是监听数据发送了某些变化,那就一定会有对应的事件类型,和状态类型。

(1) 事件类型(znode节点相关):

  1. EventType.None :客户端连接成功
  2. EventType.NodeCreated :节点创建
  3. EventType.NodeDataChanged :节点变更
  4. EventType.NodeChildrenChanged :子节点变量
  5. EventType.NodeDeleted :节点删除

(2) 状态类型(客户端状态):

  1. KeeperState.Disconnected :客户端连接断开
  2. KeeperState.SyncConnected :客户端连接成功
  3. KeeperState.AuthFailed :客户端认证失败
  4. KeeperState.Expired :客户端连接过期

3.2 watcher 的特性

Watcher 的特性:一次性、客户端串行执行、轻量。

(1) 一次性

对于 ZK 的 watcher,你只需要记住一点: zookeeper 有 watch事件,是一次性触发的,当 watch 监视的数据发生变化时,通知设置了该 watch 的 client,即 watcher,由于 zookeeper 的监控都是一次性的所以每次必须设置监控。

(2) 客户端串行执行

客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序,同时需要开发人员注意一点,千万不要因为一个 Watcher 的处理逻辑影响了整个客户端的 Watcher 回调。

(3) 轻量

Watched Event 是 Zookeeper 整个 Watcher 通知机制的最小通知单元,整个结构只包含三部分:通知状态、事件类型和节点路径。也就是说 Watcher 通知非常的简单,只会告诉客户端发生了事件而不会告知其具体内容,需要客户自己去进行获取,比如 NodeDataChanged 事件, Zookeeper 只会通知客户端指定节点的数据发生了变更,而不会直接提供具体的数据内容。

我们通过一个示例,详细学习下 Watcher 的概念和其目的。

3.3 Watcher 示例

package com.github.binarylei.zookeeper.watcher;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;

/**
 * @author: leigang
 * @version: 2018-04-05
 */
public class ConnectionWatcher implements Watcher, Closeable {
    private static final int SESSION_TIMEOUT = 5000;
    /** 定义原子变量,用于记录watcher数 */
    private AtomicInteger seq = new AtomicInteger();

    protected ZooKeeper zk;
    private CountDownLatch connectedSignal = new CountDownLatch(1);

    public void connect(String host) {
        this.close();
        try {
            this.zk = new ZooKeeper(host, SESSION_TIMEOUT, this);
            this.connectedSignal.await();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {
        if (event == null) {
            return;
        }
        // 连接状态
        KeeperState keeperState = event.getState();
        // 事件类型
        EventType eventType = event.getType();
        // 受影响的path
        String path = event.getPath();
        String logPrefix = "【Watcher-" + seq.incrementAndGet() + "】";

        if(keeperState == KeeperState.SyncConnected) {
            // 成功连接上服务器
            if (eventType == EventType.None) {
                System.out.println(logPrefix + "成功连接上服务器");
                this.connectedSignal.countDown();
            }
            // 节点创建
            else if (eventType == EventType.NodeCreated) {
                System.out.println(logPrefix + "节点创建:" + path);
            }
            // 节点数据更新
            else if (eventType == EventType.NodeDataChanged) {
                System.out.println(logPrefix + "节点数据更新:"+ path);
            }
            // 节点删除
            else if (eventType == EventType.NodeDeleted) {
                System.out.println(logPrefix + "节点删除:" + path);
            }
        } else if (keeperState == KeeperState.Disconnected) {
            System.out.println(logPrefix + "连接断开");
        } else if (keeperState == KeeperState.AuthFailed) {
            System.out.println(logPrefix + "权限认证失败");
        } else if (keeperState == KeeperState.Expired) {
            System.out.println(logPrefix + "连接过期");
        }
    }

    // 关闭连接
    public void close() {
        if (this.zk != null) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public ZooKeeper getZk() {
        return zk;
    }

    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }
}

public class ConnectionWatcherTest {

    @Test
    public void test() throws KeeperException, InterruptedException {
        ConnectionWatcher zkWatcher = new ConnectionWatcher();
        zkWatcher.connect("127.0.0.1:2181");

        ZooKeeper zk = zkWatcher.getZk();
        zk.create("/date", String.valueOf(System.currentTimeMillis()).getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        //1. 注册watch事件
        zk.exists("/date", true);
        zk.setData("/date", String.valueOf(System.currentTimeMillis()).getBytes(), -1);

        //2. watch的触发是一次性的,要想再次触发必须重新注册
        zk.exists("/date", true);
        zk.setData("/date", String.valueOf(System.currentTimeMillis()).getBytes(), -1);

        zk.delete("/date", -1);
        zkWatcher.close();
    }
}

测试结果如下:

【Watcher-1】成功连接上服务器
【Watcher-2】节点数据更新:/date
# 当第二个 zk.exists("/date", true); 注释后,【Watcher-3】就不会触发了
【Watcher-3】节点数据更新:/date

四、Zookeeper 安全机制

ACL(Access Control List), Zookeeper 作为分布式协调框架,其内部存储的都是一些关乎分布式系统运行时状态的元数据,尤其是设计到分布式锁、 Master 选举和协调等应用场景。我们需要有效地保障 Zookeeper 中的数据安全,Zookeeper 提供一套完善的 ACL 权限控制机酮来保障数据的安全。

Zookeeper 提供了三种模式。权限模式、授权对象、权限。

(1) 权限模式: Scheme,开发人员最多使用的如下四种权限模式:

  1. IP :ip 模式通过 ip 地址粒度来进行控制权限,例如配置了 192.168.1.107 即表示权限控都针对这个 ip 地址的,同时也支持按网段分配,比如 192.168.1.*
  2. Diges :digest 是最常用的权限控制模式 ,也更符合我们对权限控制的认识,其类似于 "username: password" 形式的权限标识进行权限配置。zK 会对形成的权限标识先后进行俩次编码处理,分别是 SHA-1 加密算法、BASE64 编码。
  3. World :World 是一直最开放的权限控制模式。这种模式可以看做为特殊的 Digest。他仅仅是一个标识而已。
  4. Super :超级用户模式_在超级用户模式下可以对 ZK 任意进行操作。

(2) 权限对象:指的是权限赋予的用户或者一个指定的实体,例如 ip 地址或机器等。在不的模式下,授权对象是不同的。这种模式和权限对象一一对应。

(3) 权限:权限就是指那些通过权限检测后可以被允许执行的操作,在 ZK 中,对数据的操作权限分为以下五大类:

CREATE、DELETE、READ、WRITE、ADMIN

4.1 代码示例

// 修改connect连接,在实例化ZooKeeper后,添加认证信息
public class ZookeeperAuth implements Watcher, Closeable {
    public void connect(String host) {
        connect(host, null);
    }

    // 添加认证
    public void connect(String host, String password) {
        this.close();
        try {
            this.zk = new ZooKeeper(host, SESSION_TIMEOUT, this);
            if (password != null) {
                this.zk.addAuthInfo("digest", password.getBytes());
            }
            this.connectedSignal.await();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

测试代码如下:

public class ZookeeperAuthTest{

    @Test
    public void test1() throws KeeperException, InterruptedException { // (1)
        ZookeeperAuth zkAuth = new ZookeeperAuth();
        zkAuth.connect("127.0.0.1", "123456");

        ZooKeeper zk = zkAuth.getZk();
        zk.create("/testAuth", "test".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);

        zkAuth.close();
    }

    @Test
    public void test2() throws KeeperException, InterruptedException { // (2)
        ZookeeperAuth zkAuth = new ZookeeperAuth();
        zkAuth.connect("127.0.0.1:2181", "1234568");

        ZooKeeper zk = zkAuth.getZk();
        byte[] data = zk.getData("/testAuth", false, null);
        System.out.println(new String(data));

        zkAuth.close();
    }
}

(1) Client-1 使用密码 123456 创建一个连接,并创建一个 znode 节点 /testAuth,注意创建节点时权限使用 ZooDefs.Ids.CREATOR_ALL_ACL

(2) Client-2 使用密码 1234568 创建一个连接,并打算修改节点 /testAuth ,结果出现权限不足的错误。

org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /testAuth

	at org.apache.zookeeper.KeeperException.create(KeeperException.java:117)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
	at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1611)
	at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1640)
	at com.github.binarylei.zookeeper.auth.ZookeeperAuthTest.test2(ZookeeperAuthTest.java:33)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
原文地址:https://www.cnblogs.com/binarylei/p/8727345.html