Zookeeper学习笔记(上)

Zookeeper学习笔记

本篇主要是一些基本的介绍和API的使用介绍, 有些只是记录了知识点,而没有完全在笔记中详细解释, 需要自行查找资料补充相关概念
主要参考了课程中的内容: Zookeeper分布式系统开发实战


Zookeeper 概述

简介:

  • 定义 : 分布式系统, 保证系统高可用, 并且高性能
  • 高性能, 分布式的, 分布应用协调服务
  • 使用文件系统目录树作为数据模型

ZK典型用用场景:

  • 注册订阅
  • 负载均衡
  • 命名服务
  • 分布式协调/通知
  • 集群管理

ZK基本概念:

  • 角色:在集群中的角色
    • leader
    • follower
    • observer:无投票权决定leader,只能获取数据
  • 会话 session
  • 数据节点: 树形数据结构
  • 数据节点分类
    • 持久节点
    • 临时节点
    • 顺序节点
  • 数据节点版本:
    • version : znode 数据版本
    • cversion : znode 的子节点版本
    • aversion
  • Watcher
  • ACL: access control lists

数据模型:

树形结构, ACL(访问控制)

Zookeeper 单机模式

安装: 解压缩

启动

  • 默认端口 2181
  • windows命令: bin/zkServer.cmd
  • unix命令: bin/zkServer.sh

Zookeeper 的命令

ls path [watch] //查看节点, 使用了 watch 之后有数据变化会收到通知

create

-s 为顺序节点
-e 为临时节点

get 获取数据内容和一些属性

set 更新节点数据内容

set path data [version] version //为指定要被更新的数据版本,如已被更新, 则操作失败

detele path [version] 为指定要被删除的数据版本,如已被更新, 则操作失败

Zookeeper 中的 Watch:

Zookeeper 中的 ACL:

每个节点的 ACL 不能继承, 需要单独设置

schema:各种访问控制策略

  • world
  • auth
  • digest
  • ip
  • ...

auth 的使用:

预先通过 addauth digest user:password 进行添加用户
setAcl /node2 auth:node2u:111111:crdwa
使用时, 需要通过 addauth digest user:password 进行设置之后, 才能访问相应的节点

ZK 集群介绍:

分布式系统的理解

  1. 基本概念: 多台计算机;彼此进行交互,通过网络进行通讯;共同目标

  2. 传统IT系统满足业务需求:

    • 性能
    • 可用性
    • 数据安全
      均是通过机器增加, 代码优化进行处理
  3. 分布式系统满足业务需求:

    通过增加机器(横向扩展)

    缺点:

    • 数据一致性的难度
    • 系统复杂性(软件结构,运维,开发调试)
    • 不可靠因素增加
    • 安全性

    保证数据一致性:

     数据复制
     集中存储(NAS , 分布式缓存等)
    

    Sharding:

     业务拆分
     数据拆分
    

ZK集群的设置:

所有集群的 zoo.cfg 中的集群配置一致

server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:12888:13888
server.3=127.0.0.1:22888:23888

在数据文件目录中有一个文件 myid , 内容为 server.X 对应的数字, 如果是 server.1 对应机器上, myid 内容即为1

Zookeeper API:

Zookeeper原生API

  1. Zookeeper 的构造函数进行服务端的连接(异步过程)

  2. create() 创建节点

    同步调用时: 如果已经存在则直接抛出异常

    异步回调时: 如果已经存在, 则返回 name 属性为 null , 正常创建则会返回创建的节点名称 , 回调需要实现 StringCallback 接口

    创建节点时需要设置 ACL 策略 : 可以使用预先定义的或者自定义的

  3. delete() 删除节点

  4. getChildren 获取子节点

  5. getData : 获取节点数据

  6. setData : 修改数据

  7. exists : 节点是否存在

以上API需要注意的是, 可以有个同步调用或者异步调用的方式, 另外可以设置 watcher

Curator:

特色: Fluent 风格API :方法级联和方法链的实现

  1. 连接

    新增连接超时(原有的超时为超过多少时间没有心跳消息而超时)

    实现重试策略

    实现指数倍增sleep时间

    ExponentialBackoffRetry

     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
     client = CuratorFrameworkFactory.builder()
     		.connectString("192.168.1.195:2181")
     		.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
     		.namespace("base").build();
     client.start();
    
  2. 创建节点:

     client.getZookeeperClient().getZooKeeper().addAuthInfo("digest", "liubx11:111111".getBytes());
     client.create().creatingParentsIfNeeded()
     		.withMode(CreateMode.PERSISTENT).withACL(Ids.CREATOR_ALL_ACL)
     		.forPath(path, data);
    
  3. 删除节点

      client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version).forPath(path);
    
  4. 获取数据 : getData

     Stat stat = new Stat();
     byte[] data = client.getData().forPath(path);
     System.out.println(stat.toString());
    
  5. 更新数据 : setData

     client.setData().withVersion(version).forPath(path, data);
    
  6. 读取子节点 : getChildren

     List<String> children = client.getChildren().usingWatcher(new WatcherTest()).forPath("/curator");
    
  7. Watcher 使用:

    • NodeCache:

    监听数据节点内容更新

    通过 NodeCacheListener 进行

    变动数据通过 NodeCache 获取

     public void addNodeDataWatcher(String path) throws Exception {
     	final NodeCache nodeC = new NodeCache(client, path);
     	nodeC.start(true);
     	nodeC.getListenable().addListener(new NodeCacheListener() {
     		public void nodeChanged() throws Exception {
     			String data = new String(nodeC.getCurrentData().getData());
     			System.out.println("path=" + nodeC.getCurrentData().getPath()
     					+ ":data=" + data);
     		}
     	});
     }
    
    • PathChildernCache

    监听指定节点的子节点变化情况:新增,子节点数据更新, 删除

    StartMode 的区别: NORMAL , POST_ INITIALIZED_EVENT

     final PathChildrenCache cache = new PathChildrenCache(this.client,
     		path, true);
     cache.start(StartMode.POST_INITIALIZED_EVENT);
     System.out.println(cache.getCurrentData().size());
     cache.getListenable().addListener(new PathChildrenCacheListener() {
     	public void childEvent(CuratorFramework client,
     			PathChildrenCacheEvent event) throws Exception {
     		if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
     			System.out.println("客户端子节点cache初始化数据完成");
     			System.out.println("size="+cache.getCurrentData().size());
     		}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
     			System.out.println("添加子节点:"+event.getData().getPath());
     			System.out.println("修改子节点数据:"+new String(event.getData().getData()));
     		}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
     			System.out.println("删除子节点:"+event.getData().getPath());
     		}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
     			System.out.println("修改子节点数据:"+event.getData().getPath());
     			System.out.println("修改子节点数据:"+new String(event.getData().getData()));
     		}
     	}
     });
    
  8. 异步执行:

    通过调用inBackground() 方法实现, 需要传入回调函数, 或者执行线程池等..

    回调函数需要实现 BackgroundCallback 接口

    会有一个 CuratorEvent 的事件, 可以看到事件状态和返回码

     	//删除的异步执行
     	client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version)
     		.inBackground(new DeleteCallBack()).forPath(path);
原文地址:https://www.cnblogs.com/ownraul/p/5601996.html