Java简单调用Zookeeper服务

一、搭建ZooKeeper环境

      1、参考Linux下ZooKeeper集群安装

二、添加zooKeeper驱动,以gradle添加为例

    compile group: 'org.apache.zookeeper', name: 'zookeeper', version: '3.4.5'
    compile group: 'junit', name: 'junit', version: '4.12'

三、测试,新建SimpleZkClient类,测试增删改查

package com.moy.zookeeper;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
 * [Project]:moy-gradle-project  <br/>
 * [Email]:moy25@foxmail.com  <br/>
 * [Date]:2018/4/3  <br/>
 * [Description]:  <br/>
 *
 * @author YeXiangYang
 */
public class SimpleZkClient {

    private static String connectString = "node3:2181,node4:2181,node5:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zkClient;
    private static String testPath = "/hello world";

    @Before
    public void before() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, (event) -> {
            System.out.printf("%s ---> %s
", event.getType(), event.getPath());
            try {
                zkClient.getChildren("/", true);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    public void addNode() throws KeeperException, InterruptedException {
        String node = zkClient.create(testPath, "hello zk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);
        System.out.printf(node);
    }

    @Test
    public void getAllNode() throws KeeperException, InterruptedException {
        List<String> childrenNode = zkClient.getChildren("/", true);
        for (String nodePath : childrenNode) {
            System.out.println(nodePath);
        }
    }

    @Test
    public void getData() throws KeeperException, InterruptedException {
        byte[] data = zkClient.getData(testPath, true, null);
        System.out.printf("%s ---> %s
", testPath, new String(data));

    }

    @Test
    public void setData() throws KeeperException, InterruptedException {
        if (Objects.nonNull(zkClient.exists(testPath, false))) {
            Stat stat = zkClient.setData(testPath, "hello world".getBytes(), -1);
            System.out.println(stat);
        }
    }

    @Test
    public void deleteNode() throws KeeperException, InterruptedException {
        if (Objects.nonNull(zkClient.exists(testPath, false))) {
            zkClient.delete(testPath, -1);
        }
    }
}
View Code

四、测试zooKeeper注册监听

  1、新建ZkHelper帮助类

package com.moy.zookeeper;

import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.Objects;

/**
 * [Project]:moy-gradle-project  <br/>
 * [Email]:moy25@foxmail.com  <br/>
 * [Date]:2018/4/5  <br/>
 * [Description]:  <br/>
 *
 * @author YeXiangYang
 */
public abstract class ZkHelper {

    private static String connectString = "node3:2181,node4:2181,node5:2181";
    private static int sessionTimeout = 2000;
    private static ThreadLocal<ZooKeeper> zkClient = new ThreadLocal<>();

    public static ZooKeeper getZkClient() {
        ZooKeeper zooKeeper = zkClient.get();
        if (Objects.isNull(zooKeeper)) {
            zooKeeper = createZkConnect();
            zkClient.set(zooKeeper);
        }
        return zooKeeper;
    }

    public static void removeZkClient() {
        if (Objects.nonNull(zkClient.get())) {
            zkClient.remove();
        }
    }


    private static ZooKeeper createZkConnect() {
        try {
            return new ZooKeeper(connectString, sessionTimeout, (event) -> {
                System.out.printf("zk发生了变化事件: %s ---> %s
", event.getPath(), event.getType());
            });
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}
View Code

  2、新建服务端测试注册类DistributedServer

package com.moy.zookeeper.demo;

import com.moy.zookeeper.ZkHelper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.Objects;
import java.util.Random;

/**
 * [Project]:moy-gradle-project  <br/>
 * [Email]:moy25@foxmail.com  <br/>
 * [Date]:2018/4/5  <br/>
 * [Description]:  <br/>
 *
 * @author YeXiangYang
 */
public class DistributedServer {

    public static final String PARENT_NODE = "/servers";


    public static void main(String[] args) throws Exception {
        // 获取zk连接
        ZooKeeper zkClient = ZkHelper.getZkClient();

        // 注册服务信息到zk上
        registerServer(zkClient);

        // 执行业务
        businessHandler();
    }

    public static void businessHandler() throws InterruptedException {
        System.out.printf("开始执行业务...
");
        Thread.sleep(Long.MAX_VALUE);
    }

    private static void registerServer(ZooKeeper zkClient) {
        try {
            checkParentNode(zkClient);

            zkClient.create(PARENT_NODE + "/server", createServerName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static String createServerName() {
        return "node-" + new Random().nextInt(10);
    }

    private static void checkParentNode(ZooKeeper zkClient)
            throws KeeperException, InterruptedException {
        Stat stat = zkClient.exists(PARENT_NODE, true);

        if (Objects.isNull(stat)) {
            zkClient.create(PARENT_NODE, PARENT_NODE.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
    }

}
View Code

  3、新建客户端测试监听类DistributedClient

package com.moy.zookeeper.demo;

import com.moy.zookeeper.ZkHelper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;

import java.util.List;
import java.util.Objects;

/**
 * [Project]:moy-gradle-project  <br/>
 * [Email]:moy25@foxmail.com  <br/>
 * [Date]:2018/4/5  <br/>
 * [Description]:  <br/>
 *
 * @author YeXiangYang
 */
public class DistributedClient {

    public static void main(String[] args) throws Exception {
        // 获取zk连接
        ZooKeeper zkClient = ZkHelper.getZkClient();

        // 获取zk服务节点信息
        listServerInfo(zkClient);

        // 处理业务
        DistributedServer.businessHandler();
    }

    private static void listServerInfo(ZooKeeper zkClient) {
        try {
            List<String> serverList = zkClient.getChildren(DistributedServer.PARENT_NODE, event -> {
                listServerInfo(zkClient);
            });

            printServerInfo(zkClient, serverList);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void printServerInfo(ZooKeeper zkClient, List<String> serverList) throws KeeperException, InterruptedException {
        System.out.println(System.currentTimeMillis() + "节点信息为:");

        if (Objects.isNull(serverList) || serverList.size() <= 0) {
            System.out.println("	[]");
            return;
        }

        for (String server : serverList) {
            String nodePath = DistributedServer.PARENT_NODE + "/" + server;

            if (Objects.isNull(zkClient.exists(nodePath, false))) {
                continue;
            }

            byte[] data = zkClient.getData(nodePath, false, null);
            String serverData = new String(data);
            System.out.printf("	[server-%s 拥有的信息为: %s]
", server, serverData);
        }
    }
}
View Code

五、运行DistributedServer,注册服务信息,在运行DistributedClient,监听服务信息。

  1、需要将zookeeper配置信息改为本身的

 

yexiangyang

moyyexy@gmail.com


 

原文地址:https://www.cnblogs.com/moy25/p/8727788.html