zookeeper

ZooKeeper 是一个开源的分布式协调服务,由雅虎创建,是 Google Chubby 的开源实现。

分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、配置维护,名字服务、分布式同步、分布式锁和分布式队列等功能。

水平扩容

水平扩容就是向集群中添加更多的机器,以提高系统的服务质量。需要进行整个集群的重启。通常有两种重启方式,一种是集群整体重启,另外一种是逐台进行服务器的重启。

 会话

会话就是一个客户端与服务器之间的一个TCP长连接。客户端和服务器的一切交互都是通过这个长连接进行的;

会话会在客户端与服务器断开链接后,如果经过了设点的sessionTimeout时间内没有重新链接后失效。

Zookeeper数据结构

  • 1、层次化的树形结构,命名符合常规文件系统规范(类似文件系统)
  • 2、每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识,节点名称不能重复
  • 3、节点Znode可以包含数据和子节点(但是EPHEMERAL临时节点类型的节点不能有子节点

Znode有四种形式的目录节点(默认是persistent持久节点)
PERSISTENT 持久节点             持久节点客户端断开连接zk不删除
PERSISTENT_SEQUENTIAL    持久顺序节点 客户端与服务器断开连接,该节点仍然存在,此时节点会被顺序编号,如:000001、000002…
EPHEMERAL 临时节点              临时节点客户端断开连接zk删除
EPHEMERAL_SEQUENTIAL     客户端与服务器断开连接,该节点会被删除,此时节点会被顺序编号,如:000001、000002…

创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。

一个 ZooKeeper 集群如果要对外提供可用的服务,那么集群中必须要有过半的机器正常工作并且彼此之间能够正常通信。

如果想搭建一个能够允许 N 台机器 down 掉的集群,那么就要部署一个由 2*N+1 台服务器构成的 ZooKeeper 集群。

集群角色

和Paxos算法中的集群角色类型,ZooKeeper中包含LeaderFollowerObserver三个角色;

通过一次选举过程,被选举的机器节点被称为Leader,Leader机器为客户端提供读和写服务;

Follower和Observer是集群中的其他机器节点,唯一的区别就是:Observer不参与Leader的选举过程,也不参与写操作的过半写成功策略。

 

选举流程:

Server1启动,给自己投票,然后发送投票信息,由于其它服务器都还没启动,所以它发现的消息收不到任何反馈,此时Server1为Looking状态
Server2启动,给自己投票,同时与Server1通信交换选举结果,由于Server2的id值较大,所以Server2胜出,但由于投票数没有过半,此时Server1和Server2都为Looking状态
Server3启动,给自己投票,同时与Server1和Server2通信交换选举结果,由于Server3的id值较大,所以Server3胜出,此时票数已经过半,所以Server3为Leader,Server1和Server2为Follower
Server4启动,给自己投票,同时与Server1、Server2、Server3通信交换选举结果,尽管Server4的id较大,但由于集群中已经存在Leader,所以Server4只能为Follower
Server5启动,同Server4类似,只能为Follower
​ 总结:

每个服务器在启动时都会选择自己,然后将投票信息发送出去
服务器编号ID越大,在选择算法中的权重越大
投票数必须过半,才能选出Leader
谁是Leader:启动顺序的前集群数/2+1个服务器中,id值最大的会成为Leader

 集群特性

一个ZooKeeper集群中,有一个领导者Leader和多个跟随者Follower
Leader负责进行投票的发起和决议,更新系统状态
Follower用于接收客户端的请求并向客户端返回结果,在选举Leader过程中参与投票
半数机制:集群中只要有半数以上节点存活,集群就能够正常工作,所以一般集群中的服务器个数都为奇数
全局数据一致:集群中每台服务器保存一份相同的数据副本,不论客户端连接到哪个服务器,数据都是一致的
更新请求顺序执行:来自同一个客户端的更新请求,按其发送顺序依次执行
数据更新的原子性:一次数据更新,要么成功,要么失败
实时性:在一定的时间范围内,客户端能读取到最新数据

应用场景

  • 统一命名服务,使用create自动创建节点编号;
  • 配置管理,多个节点的共享配置,当配置发生变化时,可利用zookeeper让使用这些配置的节点获得通知,进行重新加载等操作。如dubbo服务。
  • 集群管理:集群选举主节点,资源定位。
  • 共享锁
  • 负载均衡

命名服务(Naming Service)

命名服务也是分布式系统中比较常见的一类场景,其中较为常见的就是一些分布式服务框架中的服务地址列表。通过调用ZK提供的创建节点的API,能够很容易创建一个全局唯一的path,这个path就可以作为一个名称。

阿里巴巴集团开源的分布式服务框架Dubbo中使用ZooKeeper来作为其命名服务,维护全局的服务地址列表,在Dubbo实现中:

服务提供者在启动的时候,向ZK上的指定节点/dubbo/${serviceName}/providers目录下写入自己的URL地址,这个操作就完成了服务的发布。

服务消费者启动的时候,订阅/dubbo/${serviceName}/providers目录下的提供者URL地址, 并向/dubbo/${serviceName} /consumers目录下写入自己的URL地址。

注意,所有向ZK上注册的地址都是临时节点,这样就能够保证服务提供者和消费者能够自动感应资源的变化。 另外,Dubbo还有针对服务粒度的监控,方法是订阅/dubbo/${serviceName}目录下所有提供者和消费者的信息。

配置管理​

场景:集群环境、服务器的许多配置都是相同的,如:数据库连接信息,当需要修改这些配置时必须同时修改每台服务器,很麻烦

​ 解决:把这些配置全部放到ZooKeeper上,保存在ZooKeeper的某个目录节点中,然后所有的应用程序(客户端)对这个目录节点进行监视Watch,一旦配置信息发生变化,ZooKeeper会通知每个客户端,然后从ZooKeeper获取新的配置信息,并应用到系统中。

集群管理​

场景:集群环境下,如何知道有多少台机器在工作?是否有机器退出或加入?需要选举一个总管master,让总管来管理集群

​ 解决:在父目录GroupMembers下为所有机器创建临时目录节点,然后监听父目录节点的子节点变化,一旦有机器挂掉,该机器与ZooKeeper的连接断开,其所创建的临时目录节点被删除,所有其他机器都会收到通知。当有新机器加入时也是同样的道理。

​ 选举master:为所有机器创建临时顺序编号目录节点,给每台机器编号,然后每次选取编号最小的机器作为master

负载均衡

​ ZooKeeper本身是不提供负载均衡策略的,需要自己实现,所以准确的说,是在负载均衡中使用ZooKeeper来做集群的协调(也称为软负载均衡)

​ 实现思路:

将ZooKeeper作为服务的注册中心,所有服务器在启动时向注册中心登陆自己能够提供的服务
服务的调用者到注册中心获取能够提供所需要服务的服务器列表,然后自己根据负载均衡算法,从中选取一台服务器进行连接
当服务器列表发生变化时,如:某台服务器宕机下线,或新机器加入,ZooKeeper会自动通知调用者重新获取服务列表
​实际上利用了ZooKeeper的特性,将ZooKeeper用为服务的注册和变更通知中心

消息中间件中发布者和订阅者的负载均衡,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,因此mq在运行过程中,会把所有broker和对应的分区信息全部注册到ZK指定节点上,默认的策略是一个依次轮询的过程,生产者在通过ZK获取分区列表之后,会按照brokerId和partition的顺序排列组织成一个有序的分区列表,发送的时候按照从头到尾循环往复的方式选择一个分区来发送消息。在某个消费者故障或者重启等情况下,其他消费者会感知到这一变化(通过 zookeeper watch消费者列表),然后重新进行负载均衡,保证所有的分区都有消费者进行消费。

zookeeper分布式锁

Zookeeper实现的分布式锁,是利用节点的操作来进行的,加锁,就是创建节点(临时节点),解锁就是删除节点,同一个业务都是在同一父节点下进行加锁和解锁操作,如果该业务父节点下有子节点,则说明该业务已经被锁住了,如果没有子节点,则没被加锁。

临时节点的特点是,当会话失效时,Zookeeper自动清除,避免获取锁的客户端掉线后,没有删除锁节点,而其他客户端都在等这个锁节点删除,产生了死锁。

Zookeeper配置文件介绍

# The number of milliseconds of each tick 
tickTime=2000 
 
# The number of ticks that the initial  
# synchronization phase can take 
initLimit=10 
 
# The number of ticks that can pass between  
# sending a request and getting an acknowledgement 
syncLimit=5 
 
# the directory where the snapshot is stored. 
# do not use /tmp for storage, /tmp here is just  
# example sakes. 
dataDir=/home/myuser/zooA/data 
 
# the port at which the clients will connect 
clientPort=2181 
 
# ZooKeeper server and its port no. # ZooKeeper ensemble should know about every other machine in the ensemble # specify server id by creating 'myid' file in the dataDir # use hostname instead of IP address for convenient maintenance
server.1=127.0.0.1:2888:3888 
server.2=127.0.0.1:2988:3988  
server.3=127.0.0.1:2088:3088 
 
# 
# Be sure to read the maintenance section of the  
# administrator guide before turning on autopurge. 
# 
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance 
# 
# The number of snapshots to retain in dataDir 
# autopurge.snapRetainCount=3 
# Purge task interval in hours 
# Set to "0" to disable auto purge feature  <br>
#autopurge.purgeInterval=1 
dataLogDir=/home/myuser/zooA/log

tickTime:心跳时间,为了确保连接存在的,以毫秒为单位,最小超时时间为两个心跳时间

initLimit:多少个心跳时间内,允许其他server连接并初始化数据,如果ZooKeeper管理的数据较大,则应相应增大这个值

clientPort:服务的监听端口

dataDir:用于存放内存数据库快照的文件夹,同时用于集群的myid文件也存在这个文件夹里(注意:一个配置文件只能包含一个dataDir字样,即使它被注释掉了。)

dataLogDir:用于单独设置transaction log的目录,transaction log分离可以避免和普通log还有快照的竞争

syncLimit:多少个tickTime内,允许follower同步,如果follower落后太多,则会被丢弃。

server.A=B:C:D:
A是一个数字,表示这个是第几号服务器,B是这个服务器的ip地址
C第一个端口用来集群成员的信息交换,表示的是这个服务器与集群中的Leader服务器交换信息的端口
D是在leader挂掉时专门用来进行选举leader所用

相关代码

增删改查案例快速入门

public class ZookeeperClientDemo {
    ZooKeeper zk = null;
    @Before
    public void init()  throws Exception{
        // 构造一个连接zookeeper的客户端对象
        zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);
    }
    
    
    @Test
    public void testCreate() throws Exception{
        // 参数1:要创建的节点路径  参数2:数据  参数3:访问权限  参数4:节点类型
        String create = zk.create("/eclipse", "hello eclipse".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(create);
        zk.close();
        
    }
    
    
    @Test
    public void testUpdate() throws Exception {
        // 参数1:节点路径   参数2:数据    参数3:所要修改的版本,-1代表任何版本
        zk.setData("/eclipse", "我爱你".getBytes("UTF-8"), -1);
        zk.close();
    }
    
    
    @Test    
    public void testGet() throws Exception {
        // 参数1:节点路径    参数2:是否要监听    参数3:所要获取的数据的版本,null表示最新版本
        byte[] data = zk.getData("/eclipse", false, null);
        System.out.println(new String(data,"UTF-8"));    
        zk.close();
    }
    
    
    
    @Test    
    public void testListChildren() throws Exception {
        // 参数1:节点路径    参数2:是否要监听   
        // 注意:返回的结果中只有子节点名字,不带全路径
        List<String> children = zk.getChildren("/cc", false);    
        for (String child : children) {
            System.out.println(child);
        }        
        zk.close();
    }
    
    
    @Test
    public void testRm() throws InterruptedException, KeeperException{
        zk.delete("/eclipse", -1);
        zk.close();
    }
 
}

Watcher代码

package cn.itcats;
 
import java.util.concurrent.CountDownLatch;
 
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
 
public class ZkClientWatcher implements Watcher {
    // 集群连接地址  这里设置了3个zk节点,节点数为单数且大于3
    private static final String CONNECT_ADDRES = "10.211.55.1:2181,10.211.55.1:2181,10.211.55.1:2181";
    // 会话超时时间
    private static final int SESSIONTIME = 2000;
    // 让zk在连接之前等待,连接成功后才能往下走.参数为1是因为ZooKeeper连接开启了一个子线程,需等子线程完成后主线程才运行
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static String LOG_MAIN = "【main】 ";
    private ZooKeeper zk;
 
    public void createConnection(String connectAddres, int sessionTimeOut) {
        try {
            zk = new ZooKeeper(connectAddres, sessionTimeOut, this);
            System.out.println(LOG_MAIN + "zk 开始启动连接服务器....");
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    public boolean createPath(String path, String data) {
        try {
            this.exists(path, true);
            this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println(LOG_MAIN + "节点创建成功, Path:" + path + ",data:" + data);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
 
    /**
     * 判断指定节点是否存在
     * 
     * @param path
     *            节点路径
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
 
    public boolean updateNode(String path,String data) throws KeeperException, InterruptedException {
        exists(path, true);
        this.zk.setData(path, data.getBytes(), -1);
        return false;
    }
 
    public void process(WatchedEvent watchedEvent) {
 
        // 获取事件状态
        KeeperState keeperState = watchedEvent.getState();
        // 获取事件类型
        EventType eventType = watchedEvent.getType();
        // zk 路径
        String path = watchedEvent.getPath();
        System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path);
        // 判断是否建立连接
        if (KeeperState.SyncConnected == keeperState) {
            if (EventType.None == eventType) {
                // 如果建立建立成功,让后程序往下走
                System.out.println(LOG_MAIN + "zk 建立连接成功!");
                countDownLatch.countDown();
            } else if (EventType.NodeCreated == eventType) {
                System.out.println(LOG_MAIN + "事件通知,新增node节点" + path);
            } else if (EventType.NodeDataChanged == eventType) {
                System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被修改....");
            }
            else if (EventType.NodeDeleted == eventType) {
                System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被删除....");
            }
 
        }
        System.out.println("--------------------------------------------------------");
    }
 
    public static void main(String[] args) throws KeeperException, InterruptedException {
        ZkClientWatcher zkClientWatcher = new ZkClientWatcher();
        zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME);
//        boolean createResult = zkClientWatcher.createPath("/p15", "pa-644064");
        zkClientWatcher.updateNode("/pa2","7894561");
    }
 
}


面试问答

Zookeeper主要是做注册中心用。基于Dubbo框架开发的提供者 消费者都向Zookeeper注册自己的URL,消费者还能拿到并订阅提供者的注册URL,以便在后续程序的执行中去调用提供者。而提供者发生了变动,也会通过Zookeeper向订阅的消费者发送通知。

部署方式?集群中的机器角色都有哪些?集群最少要几台机器
单机,集群。Leader、Follower。集群最低3(2N+1)台,保证奇数,主要是为了选举算法。
集群如果有3台机器,挂掉一台集群还能工作吗?挂掉两台呢?
记住一个原则:过半存活即可用。
集群支持动态添加机器吗?
其实就是水平扩容了,Zookeeper在这方面不太好。两种方式:
全部重启:关闭所有Zookeeper服务,修改配置之后启动。不影响之前客户端的会话。
逐个重启:顾名思义。这是比较常用的方式。

文章内容来源自

https://blog.csdn.net/msyqmsyq/article/details/85261646

https://blog.csdn.net/itcats_cn/article/details/82597659

https://www.cnblogs.com/fanguangdexiaoyuer/p/10311228.html#4344267

原文地址:https://www.cnblogs.com/dingpeng9055/p/11213999.html