《Zookeeper中间件》Zookeeper整合SpringBoot

前言

前面我们zookeeper也安装了,操作命令也学习了,现在来使用SpringBoot整合一下zookeeper。

整合

第一步设置配置文件(application.properties):

zookeeper.address=127.0.0.1:2181
zookeeper.timeout=40000

第二步将ZooKeeper加入Spring容器: 

import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.CountDownLatch;

@Configuration
public class ZookeeperConfig {

    @Value("${zookeeper.address}")
    String address;

    @Value("${zookeeper.timeout}")
    int timeout;

    @Bean
    public ZooKeeper getZookeeper(){
        ZooKeeper zooKeeper = null;
        try {
            /**
             * CountDownLatch 用于标记线程是否执行完。
             */
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            zooKeeper = new ZooKeeper(address, timeout, (x) -> {
                if(Watcher.Event.KeeperState.SyncConnected == x.getState()){
                    countDownLatch.countDown();
                }
            });
            countDownLatch.await();
            System.out.println("zookeeper连接成功!");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return zooKeeper;
    }
}

第三步测试监听:

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ZookeeperServer {

    @Autowired
    ZooKeeper zooKeeper;

    /**
     * 监听其中的一个节点
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void watchEvent() throws KeeperException, InterruptedException {
        Stat stat = new Stat();
        zooKeeper.getData("/tao", (x)-> {
            System.out.println(x.getType());
        }, stat);
    }
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestZookeeperController {

    @Autowired
    ZookeeperServer zookeeperServer;

    @RequestMapping("/zookeeper.do")
    public String event() throws Exception {
        zookeeperServer.watchEvent();
        return "success";
    }
}

测试效果:

ok,到这里,我们整合Spring完成。

其他

上面的节点监听是一次性的,不符合我们的要求

在ZookeeperServer中加入如下代码,实现继续监听 

    public void addWatchEvent() throws KeeperException, InterruptedException {
        zooKeeper.addWatch("/tao",(x) -> {
                System.out.println("PERSISTENT_RECURSIVE"+x);
        }, AddWatchMode.PERSISTENT_RECURSIVE);
    }

运行结果:

AddWatchMode.PERSISTENT::监听该节点的变化,包含孩子节点的创建和删除,但是孩子节点修改则不会被监听到。

AddWatchMode.PERSISTENT_RECURSIVE:监听该节点的变化,包含孩子节点的创建和删除和修改值。

创建顺序节点

public void createNode() throws Exception {
        String a = zooKeeper.create("/my", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        System.out.println(a);
    }

删除节点

    public void deleteNode() throws Exception {
        zooKeeper.delete("/tao",-1);
    }

总结

整合Zookeeper分布式框架步骤

1.启动zk的服务端

2.设置配置文件

3.通过java实现客户端连接服务端

4.监听节点变化等一些API的实现。

原文地址:https://www.cnblogs.com/jssj/p/14018598.html