zookeeper学习笔记

zookeeper安装与基本命令

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

要安装zookeeper首先需要安装jdk,可以去ZooKpper官网下载最新的版本。

并解压到指定目录配置zoo.cfg , 在conf文件夹下复制一份新的zoo_sample.cfg并重新命名为zoo.cfg。

安装步骤

# cd /opt
# wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz 
# tar -xzvf zookeeper-3.4.8.tar.gz 
# mv zookeeper-3.4.8 zookeeper
# cd zookeeper/
# cd conf/ 
# mv zoo_sample.cfg zoo.cfg

启动zk要到bin目录下,执行zkServer.sh start就可以了。

停止当然是zkServer.sh stop

查看状态zkServer.sh status

链接zk使用zkCli.sh -server 127.0.0.1:2181

输入help回车后查看帮助信息

ZooKeeper -server host:port cmd args
    stat path [watch]
    set path data [version]
    ls path [watch]
    delquota [-n|-b] path
    ls2 path [watch]
    setAcl path acl
    setquota -n|-b val path
    history 
    redo cmdno
    printwatches on|off
    delete path [version]
    sync path
    listquota path
    rmr path
    get path [watch]
    create [-s] [-e] path data acl
    addauth scheme auth
    quit 
    getAcl path
    close 
    connect host:port

创建节点以及值

create /node1 value1

查看节点

使用ls指令查看当前ZK中所包含的内容ls /

查看节点中的值

get /node1

更新节点中的值

set /node2 value3

删除节点

delete /node2

watch介绍

watch表示监听事件,比如执行命令ls /node2

然后我们新开个窗口连上zk,在node2下面新建个子节点,建完后马上之前加了watch的窗口就能收到新建的事件了。

这种场景适合用在配置变更的时候各个子节点都需要重新加载配置。

zoo.cfg配置信息详解

# The number of milliseconds of each tick
tickTime=2000 ##ZooKeeper的最小时间单元,单位毫秒(ms),默认值为3000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10 ##Leader服务器等待Follower启动并完成数据同步的时间,默认值10,表示tickTime的10倍
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5 ##Leader服务器和Follower之间进行心跳检测的最大延时时间,默认值5,表示tickTime的5倍
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper ##ZooKeeper服务器存储快照文件的目录,必须配值,建议放置在var目录下
# the port at which the clients will connect
clientPort=2181 ## 服务器对外服务端口,默认值为2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

zookeeper权限控制 ACL操作

CL全称为Access Control List(访问控制列表),用于控制资源的访问权限。Zookeeper利用ACL策略控制节点的访问权限,如节点数据读写、节点创建、节点删除、读取子节点列表、设置节点权限等。

Zookeeper的ACL,scheme、id、permission,通常表示为:

scheme:id:permission。
  • schema代表授权策略
  • id代表用户
  • permission代表权限

permission分为下面几种:

  • CREATE(c):创建子节点的权限
  • READ(r):读取节点数据的权限
  • DELETE(d):删除节点的权限
  • WRITE(w):修改节点数据的权限
  • ADMIN(a):设置子节点权限的权限

密码权限控制

语法:digest:username:BASE64(SHA1(password)):权限信息
首先我们创建一个节点

create /test 10

创建完成之后设置权限

setAcl /test digest:zhangsan:zwnqMhjMhpBo3CqM8qqH5mM73s8=:crdwa

然后来验证下权限是否可用

[zk: localhost:2181(CONNECTED) 2] get /test
Authentication is not valid : /test

执行命令会发现没有权限操作

进行认证操作后再次执行get操作就可以了

addauth digest zhangsan:123456

这边需要注意的是密码的生成方式,密码是加密的,不是明文的,我们需要借助于Zookeeper中的一个类来加密密码,如果是代码中操作就直接加密了,在shell中就得单独去加密了。

java -Djava.ext.dirs=/Users/zhangsan/Documents/java/zookeeper-3.4.7/lib -cp /Users/zhangsan/Documents/java/zookeeper-3.4.7/zookeeper-3.4.7.jar org.apache.zookeeper.server.auth.DigestAuthenticationProvider zhangsan:123456

输出加密内容如下:

zhangsan:123456->zhangsan:zwnqMhjMhpBo3CqM8qqH5mM73s8=

IP权限控制

还有一种就是通过IP来控制节点的访问权限,一般不建议使用,因为IP发生变动的可能性比较大

语法是:ip:IP信息:权限信息

创建一个节点

create /ip2 ip

设置IP访问权限

setAcl /ip2 ip:192.168.31.139:r

Java连接zookeeper

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId
    <version>3.4.7</version>
</dependency>
/**
 * 连接ZK测试
 */
public class ConnTest {

    public static void main(String[] args) {
        try {
            // 建立连接
            ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 1000 * 10, null);
            // 创建节点
            zooKeeper.create("/connTest", "connTest".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            byte[] result = zooKeeper.getData("/connTest", false, new Stat());
            System.out.println(new String(result));
            
            //zooKeeper.delete("/connTest", -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

使用curator客户端更方便操作zookeeper

        <!--<dependency>-->
            <!--<groupId>org.apache.zookeeper</groupId>-->
            <!--<artifactId>zookeeper</artifactId>-->
            <!--<version>3.4.7</version>-->
        <!--</dependency>-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.4.0</version>
        </dependency>    
/**
 * zk客户端
 * 测试监听zk节点变化
 */
public class CuratorTest {

    public static void main(String[] args) throws Exception {
        // 1 重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        // 2 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .sessionTimeoutMs(1000 * 10)
                .retryPolicy(retryPolicy)
                .build();
        // 3 开启连接
        cf.start();
        
//        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/yjh/aa","aa内容".getBytes());
//        System.out.println(new String(cf.getData().forPath("/yjh/aa")));
//        cf.setData().forPath("/yjh/aa", "修改aa内容".getBytes());
//        System.out.println(new String(cf.getData().forPath("/yjh/aa")));
//        cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/yjh");
        
//        if (cf.checkExists().forPath("/tt") == null) {
//            cf.create().creatingParentsIfNeeded().forPath("/tt","tt".getBytes());
//        }
//        byte[] data = cf.getData().usingWatcher(new Watcher() {  
//            public void process(WatchedEvent event) {
//                System.out.println("节点监听器 : " + event.getType().getIntValue() + "	" + event.getPath());  
//            }  
//        }).forPath("/tt");
//        System.out.println(new String(data));
        
         ExecutorService pool = Executors.newFixedThreadPool(2);
            
         final NodeCache nodeCache = new NodeCache(cf, "/test", false);
         nodeCache.start(true);
         nodeCache.getListenable().addListener(
            new NodeCacheListener() {
                public void nodeChanged() throws Exception {
                    System.out.println(nodeCache.getCurrentData().getPath() + "数据改变了, 新的数据是: " +
                        new String(nodeCache.getCurrentData().getData()));
                }
            }, 
            pool
            );
        Thread.sleep(Integer.MAX_VALUE);
    }

}

zookeeper分布式锁

/**
 * zk分布式锁
 */
public class LockTest {
    
    static int count = 2;

    public static void main(String[] args) {
        // 1 重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        // 2 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .sessionTimeoutMs(1000 * 10)
                .retryPolicy(retryPolicy)
                .build();
        // 3 开启连接
        cf.start();
        final InterProcessMutex lock = new InterProcessMutex(cf, "/mylock");
        final CountDownLatch latch = new CountDownLatch(1);
        ExecutorService pool = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 20; i++) {
            pool.execute(() -> {
                try {
                    System.err.println(1);
                    latch.await();
                    lock.acquire();
                    Thread.sleep(100);
                    //synchronized (LockTest.class) {
                        if (count > 0) {
                            count--;
                            lock.release();
                            System.out.println(count);
                        }
                        
                    //}
                    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        System.out.println("开始执行");
        latch.countDown();
        pool.shutdown();
    }

}

zookeeper作为服务注册中心

首先编写一个服务注册中心类,监听服务的注册与停止

/**
 * 测试服务注册中心
 * 监听服务注册与停止
 */
public class ServiceClient {

    public static void main(String[] args) {
        // client.get("http://localhost/get");
        // client.get("http://localhost/get2");
        // client.get("nginx地址"); --->   get./get2
        
        // 从zk中获取服务地址列表,选择一个进行请求,本地执行负载均衡
        
        // 1 重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        // 2 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .sessionTimeoutMs(1000 * 10)
                .retryPolicy(retryPolicy)
                .build();
        // 3 开启连接
        cf.start();

        // 开始监听
        try {
            final PathChildrenCache childrenCache = new PathChildrenCache(cf, "/service", true);
            childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
            childrenCache.getListenable().addListener(
                new PathChildrenCacheListener() {
                    @Override
                    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
                            throws Exception {
                            switch (event.getType()) {
                            case CHILD_ADDED:
                                System.out.println("CHILD_ADDED: " + event.getData().getPath());
                                break;
                            case CHILD_REMOVED:
                                System.out.println("CHILD_REMOVED: " + event.getData().getPath());
                                break;
                            case CHILD_UPDATED:
                                System.out.println("CHILD_UPDATED: " + event.getData().getPath());
                                break;
                            default:
                                break;
                        }
                    }
                }
            );
            List<String> urls = cf.getChildren().forPath("/service");
            for (String url : urls) {
                System.out.println(url);
            }
            Thread.sleep(200000000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

另外编写两个服务类,用于模拟两个服务,分别往zookeeper创建节点,相当于注册服务,由上面的服务注册中心进行监听

用户服务:

/**
 * 测试服务注册中心
 * 模拟用户服务
 */
public class UserServiceApplication {

    public static void main(String[] args) {
        // 1 重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        // 2 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .sessionTimeoutMs(1000 * 10)
                .retryPolicy(retryPolicy)
                .build();
        // 3 开启连接
        cf.start();
        try {
            cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/service/192.168.1.1", "".getBytes());
            Thread.sleep(200000000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

订单服务:

/**
 * 测试服务注册中心
 * 模拟订单服务
 */
public class OrderServiceApplication {

    public static void main(String[] args) {
        // 1 重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        // 2 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .sessionTimeoutMs(1000 * 10)
                .retryPolicy(retryPolicy)
                .build();
        // 3 开启连接
        cf.start();
        try {
            cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/service/192.168.1.2", "".getBytes());
            Thread.sleep(200000000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

然后分别启动上面这三个类,服务注册中心就可以监听用户服务和订单服务的注册和停止了。

ps:编写一个简单的zookeeper操作工具类

/**
 * Created by 唐哲
 * 2018-06-24 16:26
 * zk操作工具类
 */
public class ZkUtils {

    private static final Integer ZK_SESSION_TIMEOUT = 10 * 1000;
    private static CountDownLatch latch = new CountDownLatch(1);
    private static ZkUtils instance = new ZkUtils();
    private static ZooKeeper zk;

    public synchronized static ZkUtils getInstance(String host, int port) {
        if (zk == null) {
            connect(host, port);
        }
        return instance;
    }

    private static void connect(String host, int port) {
        String connectString = host + ":" + port;
        try {
            zk = new ZooKeeper(connectString, ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("已经触发了" + event.getType() + "事件!");
                    // 判断是否已连接ZK, 连接后计数器递减
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
            // 若计数器不为0, 则等待
            latch.await();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String addNode(String nodeName) {
        Stat stat;
        try {
            stat = zk.exists(nodeName, false);
            if (stat == null) {
                return zk.create(nodeName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public String addNode(String nodeName, String data) {
        Stat stat;
        try {
            stat = zk.exists(nodeName, false);
            if (stat == null) {
                return zk.create(nodeName, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public String addNode(String nodeName, String data, List<ACL> acl, CreateMode createMode) {
        Stat stat;
        try {
            stat = zk.exists(nodeName, false);
            if (stat == null) {
                return zk.create(nodeName, data.getBytes(), acl, createMode);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public void removeNode(String nodeName) {
        try {
            zk.delete(nodeName, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    public void removeNode(String nodeName, int version) {
        try {
            zk.delete(nodeName, version);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    public String setData(String nodeName, String data) {
        Stat stat;
        try {
            stat = zk.exists(nodeName, false);
            if (stat != null) {
                zk.setData(nodeName, data.getBytes(), -1);
                return data;
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 监控数据节点变化
     */
    public void monitorDataUpdate(String nodeName) {
        try {
            zk.getData(nodeName, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    // 节点的值有修改
                    if(event.getType() == EventType.NodeDataChanged) {
                        System.out.println(nodeName + "修改了值" + event.getPath());
                        // 触发一次就失效,所以需要递归注册
                        monitorDataUpdate(nodeName);
                    }
                }
            }, new Stat());
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ZkUtils zkUtils = ZkUtils.getInstance("localhost", 2181);
        //zkUtils.removeNode("/test");
        
//        String result = zkUtils.addNode("/test");
//        System.out.println(result);
//
//        result = zkUtils.addNode("/test", "10");
//        System.out.println(result);
        String result = zkUtils.setData("/test", "hello");
        System.out.println(result);

        zkUtils.monitorDataUpdate("/test");

        CountDownLatch countDownLatch = new CountDownLatch(1);
        countDownLatch.await();
    }

}
原文地址:https://www.cnblogs.com/tangzhe/p/9229688.html