zookeeper 学习(二) java操作zookeeper

码云`

简单创建一个demo

  首先创建maven项目,在pom中引入 zookeeper

  <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.0</version>
        </dependency>

然后在resource文件夹下 创建log4j.properties=》作为zookeeper日志输出

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %p %c{2}: %m%n

创建一个class,操作

package com.item.zkjavaapi;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;

/**
 * 描述:     连接到ZK服务端,打印连接状态
 */
public class ZKConnect implements Watcher {
    //端口
    public static final String SERVER_PATH = "127.0.0.1:2181";
    //检测心跳过期时间-毫秒
    public static final Integer TIMEOUT = 5000;
public static void main(String[] args)
throws IOException, InterruptedException, KeeperException {

/**
* 客户端和服务端他们是异步连接,连接成功之后,客户端会收到watcher通知。
* connectString:服务器的IP+端口号,比如127.0.0.1:2181
* sessionTimeout:超时时间
* watcher:通知事件
*/
ZooKeeper zooKeeper = new ZooKeeper(SERVER_PATH, TIMEOUT, new ZKOperator());
System.out.println("客户端开始连接ZK服务器了");
System.out.println(zooKeeper.getState());
Thread.sleep(2000);

/**
* path:创建的路径
* data:存储的数据
* acl:权限,开放
* createMode:永久、临时、顺序。
*/
zooKeeper.create("/demoNode1","2233".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

//获取
byte[] data = zooKeeper.getData("/demoNode1", null, null);
System.out.println(new String(data));
/**
* 修改
* path:节点的路径
* data :数据
* version:版本 盘本匹配,才去修改
*/
zooKeeper.setData("/demoNode1","3344".getBytes(),0);
data = zooKeeper.getData("/demoNode1", null, null);
System.out.println(new String(data));


//判断节点是否存在以及获取其版本
Stat exists = zk.exists("/demoNode1", false);
if (exists != null) {
System.out.println("节点的版本为:" + exists.getVersion());
}else {
System.out.println("该节点不存在");
}


String ctx = "删除成功";
zooKeeper.delete("/demoNode1", 0, new DeleteCallBack(), ctx);
Thread.sleep(2000);
}
@Override
public void process(WatchedEvent event) {
//根据类型判断
if (event.getType() == Event.EventType.NodeDataChanged) {
System.out.println("数据被改变");

}

System.out.println("收到了通知" + event);
}

DeleteCallBack代码

package com.item.zkjavaapi.callback;

import org.apache.zookeeper.AsyncCallback;
/**
 * 描述:     删除后运行的方法
 */
public class DeleteCallBack implements AsyncCallback.VoidCallback {
    @Override
    public void processResult(int i, String s, Object o) {
        System.out.println("删除节点" + s);
        System.out.println((String)o);
    }
}

使用Apache Curator操作zookeeper

  首先pom需要引用以下

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

然后代码

package com.item.zkjavaapi.Curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class CuratorDemo {
    //端口
    public static final String SERVER_PATH = "127.0.0.1:2181";
    //检测心跳过期时间-毫秒
    public static final Integer TIMEOUT = 5000;

    public static void main(String[] args) throws Exception{
        String nodePath="/demoNode2";
        //设置重新连接功能   ,    ;1000 从第一次重试开始已经花费的时间   3 重试次数
        RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(SERVER_PATH, retry);
        client.start();
        String data = "test";
        String data2 = "test2";

        //创建节点以及确定值
        client.create().withMode(CreateMode.PERSISTENT).forPath(nodePath,data.getBytes());
        //获取节点的值
        byte[] bytes = client.getData().watched().forPath(nodePath);

        System.out.println(new String(bytes));
        //修改节点的值
        client.setData().forPath(nodePath,data2.getBytes());
        bytes = client.getData().watched().forPath(nodePath);
        System.out.println(new String(bytes));
        //删除节点
        client.delete().forPath(nodePath);
        Thread.sleep(200);
    }

}
原文地址:https://www.cnblogs.com/1439107348s/p/14477151.html