动态从zookeeper读取kafka信息

连接kafka时,经常遇到配置kafka连接信息连接失败,程序后台一直打印连接失败信息,或者由于连接不上kafka程序启动直接失败情况,考虑一种方案如下:

从从zookeeper中读取kafka集群信息,如果kafka集群信息中有配置的kafka连接信息,则说明kafka正常启动,已注册到zookeeper,可正常连接

贴上代码如下:

    public static void main(String[] args) throws Exception {
        getNodes();
    }


    public static void getNodes() throws Exception {
        CuratorFramework client = CuratorFrameworkFactory
                .newClient(connectString, 1000*60, 1000*15, new RetryNTimes(10,5000));
        client.start();//开始连接
        CuratorFrameworkState st = client.getState();
        System.out.println(st);
        List<String> children = client.getChildren().usingWatcher(new CuratorWatcher() {
            @Override
            public void process(WatchedEvent event) throws Exception {
                System.out.println("监控: " + event);
            }
        }).forPath("/brokers/ids");
        if (ValidateUtil.isNotEmpty(children)) {
            for (String id : children) {
                //String brokerInfo = new String(client.getData("/brokers/ids/" + id));
                System.out.println("current Id:"+id);
                //返回的字节数组
              byte[] o= client.getData().usingWatcher(new CuratorWatcher() {
                    @Override
                    public void process(WatchedEvent event) throws Exception {
                        LOGGER.info("触发watcher, path:{}", event.getPath());
                    }
                }).forPath("/brokers/ids/" + id);
                System.out.println("o:"+o);
                String b = new String(o);
                System.out.println("current node content:"+b);
            }

        }
        System.out.println(children);
        String result = client.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/test", "Data".getBytes());
        System.out.println(result);
        // 设置节点数据
        client.setData().forPath("/test", "111".getBytes());
        client.setData().forPath("/test", "222".getBytes());
        // 删除节点
        System.out.println(client.checkExists().forPath("/test"));
        client.delete().withVersion(-1).forPath("/test");
        System.out.println(client.checkExists().forPath("/brokers"));
        client.close();
        System.out.println("OK!");
        client.getCuratorListenable().addListener(new CuratorListener()
        {
            @Override
            public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
            {
                System.out.println("事件: " + event);
            }
        });
        client.getConnectionStateListenable().addListener(new ConnectionStateListener()
        {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState)
            {
                System.out.println("连接状态事件: " + newState);
            }
        });
        client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener()
        {
            @Override
            public void unhandledError(String message, Throwable e)
            {
                System.out.println("错误事件:" + message);
            }
        });
    }

参照资料:

1、动态从zookeeper中读取kafka集群

2、Spring Boot 使用 Curator 操作 ZooKeeper

3、Springboot2(29)集成zookeeper的增删改查、节点监听、分布式读写锁、分布式计数器

4、springboot搭建连接zookeeper

原文地址:https://www.cnblogs.com/dancser/p/12869044.html