zookeeper的安装与使用

简介

  • Zookeeper是因为大数据而火,大数据的组件多依赖于Zookeeper。
  • ZooKeeper由雅虎研究院开发,后来捐赠给了Apache。
  • ZooKeeper是一个开源的分布式应用程序协调服务器,其为分布式系统提供一致性服务。
  • 其一致性是通过基于Paxos算法的ZAB协议完成的。
  • 其主要功能包括:配置维护、域名服务、分布式同步、集群管理等。
  • zookeeper的官网: http://zookeeper.apache.org

zookeeper本质是通过集群方式维护一棵树,树的节点叫做znode,路径叫做namespace,可以通过
namespace找到对应znode,有点像jacksonjson树节点的查找方式,如JsonNode.at("/root/parentNode/childNode")
znode 主要由4部分组成,分别是namespacetypestatdata
namespaceznode的路径(可看作json的key)。typeznode的类型,类型有两种,
分别是持久节点和临时节点,临时节点会随着创建它的客户端的会话的关闭而消失。statznode
的状态信息,包括版本号,子节点个数,存储的数据大小等信息。dataznode存储空间(可以看做json的value)。

zookeeper 服务端

集群安装

  • 假设有三台linux系统机器,可用虚拟机替代,ip分别是192.168.1.1 ~ 192.168.1.3
  • 可用通过xshell工具发送键到所有会话来同时控制三台机器
  • 下载zookeeper安装包,下载地址https://zookeeper.apache.org/releases.html
  • 或者通过linux命令下载 wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz (注意版本)
  • 将zookeeper安装包分别放在三台机器的 /home/soft 目录下
  • 命令tar -zxvf zookeeper-3.4.5.tar.gz解压并通过命令mv zookeeper-3.4.5 zookeeper修改目录名称
  • 命令cd /home/soft/zookeeper/conf 进入zookeeper配置文件目录
  • 命令mv zoo_sample.cfg zoo.cfg 修改配置文件名称
  • 命令vim zoo.cfg 编辑配置文件

编辑配置文件

tickTime=2000
dataDir=/home/softs/zookeeper/data
dataLogDir=/home/softs/zookeeper/logs
clientPort=2181
initLimit=5
syncLimit=2
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
autopurge.purgeInterval=48
autopurge.snapRetainCount=20
  1. tickTime:Client-Server通信心跳时间
  • Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。tickTime以毫秒为单位。
  1. initLimit:Leader-Follower初始通信时限
  • 集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)。
  1. syncLimit:Leader-Follower同步通信时限
  • 集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数(tickTime的数量)。
  1. dataDir:数据文件目录
  • Zookeeper保存数据的目录,没有则需要自己创建
  1. dataLogDir:日志文件目录
  • Zookeeper保存日志的目录,没有则需要自己创建
  1. clientPort:客户端连接端口
  • 客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
  1. 服务器名称与地址:集群信息(服务器编号,服务器地址,LF通信端口,选举端口)
  • 这个配置项的书写格式比较特殊,规则如下:server.N=YYY:A:B
  • N是zookeeper服务实例的myid。注意各自ip要与myid对应
  • YYY是对应ip或者计算机名,若是计算机名,需要在/etc/hosts文件中添加映射
  • A是LF通信端口
  • B是选举端口

编辑myid

  • 每个zookeeper实例都有自己的id,叫做myid
  • 在配置文件中的dataDir字段对应的目录下创建myid文件并写入id,命令echo N > myid,其中N是实例的id,一般用自然数表示

集群启动

  • 进入zookeeper的bin目录,命令cd /home/soft/zookeeper/bin
  • 启动服务脚本,命令./zkServer.sh start
  • 启动需要时间,过个三秒钟查看服务状态,命令./zkServer.sh status
  • 该命令能够返回服务状态,若返回follower说明是从服务,若返回leader说明是主服务,一个集群只能有一个leader,是通过选举算法选举出来的

zookeeper 客户端

命令行方式

在zookeeper的bin目录里提供了客户端脚本,命令./zkCli.sh启动客户端
可以使用以下命令测试下

## 查看根目录包含节点
ls /
## 创建节点持久节点,hello是存储的数据
create /node1 hello
## 创建临时节点,quit命令退出客户端后节点消失
create -e /node2 hello
## 获取节点存储的数据
get /node1
## 查看节点的状态信息
stat /node1
## 删除节点
delete /node1
## 创建节点的子节点
create /node1/son1 hi

代码方式

实际开发中程序员使用zookeeper是在代码中,Apache组织提供了一个客户端的jar包,
但由于原生jar包的操作比较繁琐且不完善,Netflix公司在此基础上又封装了一个
jar包,叫Curator,后捐献给Apache组织。现阶段使用Curator的比较多。

  1. maven下载
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>>2.4.1</version>
</dependency>

注意curator版本,不同curator版本使用的原生客户端jar包版本不一样,
curator 2.4.1使用的是 zookeeper-client 3.4.5,
且客户端版本要与服务端版本保持一致

  1. 使用

这里是在springboot项目中使用,spring项目也适用

  • 注册bean
@Configuration
public class ZkConfig {
    @Bean
    public CuratorFramework curatorFramework() {
        //以下配置参数可以在配置文件中读取
        //zookeeper集群地址,多个ip用逗号分隔
        String url = "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181";
        //连接服务器超时时间
        int connectionTimeout = 10000;
        //客户端session超时时间
        int sessionTimeout = 10000;
        int sleepTimeout = 3000;
        //最多重试次数
        int maxRetries = 3;
        int waitTime = 20;
        //namespace的根节点,代码中所有创建的节点都在此节点下
        String namespace = "myNode";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(sleepTimeout, maxRetries);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(url)
                .connectionTimeoutMs(connectionTimeout)
                .sessionTimeoutMs(sessionTimeout)
                .namespace(namespace)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
        return client;
    }
}
  • 代码使用
@Component
@Slf4j
public class ZkClient {
    @Autowired
    private CuratorFramework client;

    /**
     * 创建一个持久化节点
     * @param path
     * @param data
     * @return
     */
    public String createNode(String path, String data) {
        if (data == null) {
            createNode(path);
        }
        String nodePath = null;
        try {
            nodePath = client.create().creatingParentsIfNeeded().forPath(path, data.getBytes());
        } catch (Exception e) {
            log.error("create node error {}", path);
        }
        return nodePath;
    }

    /**
     * 创建一个不带数据的持久化节点
     * @param path
     * @return
     */
    public String createNode(String path) {
        String nodePath = null;
        try {
            nodePath = client.create().creatingParentsIfNeeded().forPath(path);
        } catch (Exception e) {
            log.error("{},{}", path, e.getMessage());
        }
        return nodePath;
    }

    /**
     * 创建一个临时节点
     * @param path
     * @param data
     * @return
     */
    public String createTempNode(String path, String data) {
        if (data == null) {
            return createTempNode(path);
        }
        String nodePath = null;
        try {
            nodePath = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes());
        } catch (Exception e) {
            log.error("create temp node error {},{}", path, e.getMessage());
        }
        return nodePath;
    }

    /**
     * 创建一个不带数据的临时节点
     * @param path
     * @return
     */
    public String createTempNode(String path) {
        String nodePath = null;
        try {
            nodePath = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
        } catch (Exception e) {
            log.error("create temp node error {}", path);
        }
        return nodePath;
    }

    /**
     * 修改节点数据
     * @param path
     * @param data
     * @return
     */
    public Stat setData(String path, String data) {
        Stat stat = null;
        try {
            stat = client.setData().forPath(path, data.getBytes());
        } catch (Exception e) {
            log.error("set data error {}", path);
        }
        return stat;
    }

    /**
     * 删除节点
     * @param path
     */
    public void deleteNode(String path) {
        try {
            client.delete().deletingChildrenIfNeeded().forPath(path);
        } catch (Exception e) {
            log.error("delete node error {}", path);
        }
    }

    /**
     * 获取节点数据
     * @param path
     * @return
     */
    public String getData(String path) {
        String data = null;
        try {
            data = new String(client.getData().forPath(path));
        } catch (Exception e) {
            log.error("get node error {},{}", path, e.getMessage());
        }
        return data;
    }
}

代码中的creatingParentsIfNeeded是创建节点时无父节点一并创建父节点,
由于zookeeper的临时节点是无法创建子节点的,即临时节点只能是叶子节点,
所以通过creatingParentsIfNeeded所创建临时节点的父节点都是持久节点

客户端选主

有时我们需要集群部署的多个客户端程序中选择一个作为主程序,其他作为从程序,就是所谓Master-Slave模式,利用Curator可以实现。通过Leader LatchLeader Election这两种方式实现。

Leader Latch方式实现主从

@Component
@Slf4j
public class LeaderElection {

    @Resource
    private CuratorFramework client;

    @Getter
    private LeaderLatch leaderLatch;

    /**
     * isLeader是选举成功时的回调,noLeader是卸任时的回调
     * 如果主节点被失效, 会进行重新选主
     */
    @PostConstruct
    public void elect() {
        try {
            String id = UUID.randomUUID().toString();
            leaderLatch = new LeaderLatch(client, "/node", id);
            LeaderLatchListener leaderLatchListener = new LeaderLatchListener() {
                @Override
                public void isLeader() {
                    log.info("leader step up, {}", leaderLatch.getId());
                }
                @Override
                public void notLeader() {
                    log.info("leader step down, {}", leaderLatch.getId());
                }
            };
            leaderLatch.addListener(leaderLatchListener);
            leaderLatch.start();
        } catch (Exception e) {
            log.error("join error...");
        }
    }
}
  • new LeaderLatch(client, "/node", id);client 是前面注册的zookeeper客户端,通过注入的方式使用。
  • "/node"LeaderLatch 在zookeeper注册临时节点的根节点,LeaderLatch是通过在zookeeper中注册临时节点来实现的。
  • id 是在临时节点中存储的数据。
  • isLeader回调是在节点成为主节点时触发。
  • notLeader回调是在主节点离线时触发。

Leader Election方式实现选主

@Component
@Slf4j
public class LeaderElection {

    @Resource
    private CuratorFramework client;

    @Getter
    private LeaderSelector leaderSelector;

    /*
    *   Leader Election模式
    *   实例被选主后执行takeLeadership, 执行完之后立刻释放领导权, 再次选主
    * */
    public void elect() {
        try {
            String id = UUID.randomUUID().toString();
            LeaderSelectorListener leaderSelectorListener = new LeaderSelectorListener() {
                @Override
                public void takeLeadership(CuratorFramework client) throws Exception {
                    log.info("leader step up, {}", id);
                    //业务处理
                    //执行完成后自动释放领导权
                }

                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState) {

                }
            };
            leaderSelector = new LeaderSelector(client, "/node", leaderSelectorListener);
            leaderSelector.autoRequeue();
            leaderSelector.start();
        } catch (Exception e) {
            log.error("join error...");
        }
    }
}
不积跬步无以至千里
原文地址:https://www.cnblogs.com/xiaogblog/p/15476420.html