jmeter插件开发及zookeeper压测示例插件

  之所以要测试这个场景,是因为最近开发还有个缺陷未解决,leader很忙,客户端a断开了,但是断开信息未同步到follower,follower选举了新的leader,新的leader不知道客户端a断开了,所以客户端a重新连接到新的leader后,信息还在,连接没有断开。事实上没有错,逻辑上要知道的是同步机制的问题,比如kafka支持at least one follower接收到了。

  会话由SessionTracker管理,它是一个线程,里面采用ExpireQueue实现,定时轮训,参见https://blog.csdn.net/jpf254/article/details/80804458。和服务器日志是一样的,如下:

    根据服务器角色不同,ZooKeeperServer,LeaderZooKeeperServer,FollowerZooKeeperServer,ObserverZooKeeperServer分别代表单机服务器,集群leader服务器,集群Follower服务器,集群observer服务器,它们的sessionTracker实现是不同的。ZookeeperServer的对应sessionTracker实现是SessionTrackerImpl;LeaderZooKeeperServer的对应sessionTracker实现是LeaderSessionTracker,FollowerZooKeeperServer,ObserverZooKeeperServer的对应sessionTracker实现是LearnerSessionTracker。

  在集群模式下,leader发送一个ping消息给它的learner,learner返回自从last PING之后的一个session列表。leader每隔半个tick就会发送一个ping给learner。所以,如果一个tick被设置成2秒,那么leader每秒就会发送一个ping。

  3.5开始进一步可以分为globalSessionTracker和LocalSessionTracker(只有部分功能,本地自我管理,不支持临时节点,不能重连),其中单机只有一个标准的SessionTrackerImpl,集群leader开启globalSessionTracker和LocalSessionTracker,follower和observer只开启LocalSessionTracker。globalSessionTracker由SessionTrackerImpl实现,LocalSessionTracker继承并扩展了SessionTrackerImpl。

 https://blog.csdn.net/damacheng/article/details/42393771 会话

 https://www.cnblogs.com/leixiaobai/p/10245991.html  jmeter插件

https://www.cnblogs.com/farmersun/p/12813442.html zk程序员指南


package com.hundsun.datacompare;

import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.KeeperException;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.*;

/**
 * @author zjhua
 */
public class DataGenerateForZookeeperServiceImpl implements DataGenerateForZookeeperService {

    @Autowired
    private DataGenerateForZookeeperConfig config;

    @Override
    public void gen() {
        if (config == null) {
            config = new DataGenerateForZookeeperConfig();
        }
        int threads = config.getParallel() == 0 ? Runtime.getRuntime().availableProcessors()/2 : config.getParallel();
        ExecutorService executorService = new ThreadPoolExecutor(threads,threads,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
        for (int c=1;c<=threads;c++) {
            int finalC = c;
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    // 1.Connect to zk
                    CuratorFramework client = CuratorFrameworkFactory.newClient(
                            config.getZkUrl(),
                            new RetryNTimes(10, 5000)
                    );
                    client.start();
                    System.out.println("zk client start successfully!");
                    try {
                        for (int i = 1; i <= config.getCount(); i++) {
                            try {
                                String keyName = config.getPrefix() + "-" + finalC + i;
                                // 2.Client API test
                                // 2.1 Create node
                                String data1 = StringUtils.rightPad("value", config.getValueSize(), "1");
//                print("create", config.getZkPath() + keyName, data1);
                                client.create().
                                        creatingParentsIfNeeded().
                                        forPath(config.getZkPath() + keyName, data1.getBytes());

//                // 2.2 Get node and data
//                print("ls", "/");
//                print(client.getChildren().forPath("/"));
//                print("get", config.getZkPath() + keyName);
//                print(client.getData().forPath(config.getZkPath() + keyName));
//
//                // 2.3 Modify data
//                String data2 = "world";
//                print("set", config.getZkPath() + keyName, data2);
//                client.setData().forPath(config.getZkPath() + keyName, data2.getBytes());
//                print("get", config.getZkPath() + keyName);
//                print(client.getData().forPath(config.getZkPath() + keyName));
//
//                // 2.4 Remove node
//                    if (i >= 2000) {
//                print("delete", config.getZkPath() + keyName);
//                        client.delete().forPath(config.getZkPath() + keyName);
//                        Thread.sleep(3000);
//                print("ls", "/");
//                print(client.getChildren().forPath("/"));
//                    }
                            } catch (KeeperException.NodeExistsException e) {
                                // NOP
                            }
                        }
                        System.out.println(Thread.currentThread().getName() + " created " + config.getCount() + " " + config.getValueSize() + "byte nodes under " + config.getZkPath() + "");
                        client.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        try {
            boolean finish = false;
            while (!finish) {
                executorService.awaitTermination(3600, TimeUnit.SECONDS);
                System.out.println("尚未完成,继续等待");
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void print(String... cmds) {
        StringBuilder text = new StringBuilder("$ ");
        for (String cmd : cmds) {
            text.append(cmd).append(" ");
        }
        System.out.println(text.toString());
    }

    private static void print(Object result) {
        System.out.println(
                result instanceof byte[]
                        ? new String((byte[]) result)
                        : result);
    }

    public static void main(String[] args) {
        DataGenerateForZookeeperServiceImpl dataGenerateForZookeeperService = new DataGenerateForZookeeperServiceImpl();
        dataGenerateForZookeeperService.gen();
    }
}


 

注:zk存在一个问题,即使节点删除了,内存也没有释放,需要重启后内存才会释放。

也有些人问,不过好像没特别好的方法,主要是:https://stackoverflow.com/questions/50437481/confusing-zookeeper-memory-usagehttp://zookeeper-user.578899.n2.nabble.com/ZooKeeper-Memory-Usage-td7267902.html

后来dump出来堆栈进行分析,及查找相关资料,可知zk为了同步修改到follower更快,会在leader节点缓存最近的500个提交日志(如果follower超过500个跟不上,就要拿leader的快照全同步了,但它有个问题,如果zk很大,会导致同步一直失败,所以3.5版本增加了一个特性,判断txnlog是不是比snapshot小很多,目前是1/3,如果是,则使用事务日志而不是快照),即ZKDatabase.committedLog。这不是问题,问题在于zk里面保留了事务数据的3份拷贝,为:

  • The first is in committedLog[i].request.request.hb - a heap-allocated ByteBuffer.
  • The second is in committedLog[i].request.txn.data - a jute-serialised record of the transaction
  • The third is in committedLog[i].packet.data - also jute-serialised, seemingly uninitialised data.

和dump分析结果完全一致,也可以参见https://issues.apache.org/jira/browse/ZOOKEEPER-1473,它建议把事务日志份数调整为可配置,但是提交的PR有bug,该bug尚未修复。

默认情况下,zk采用WAL机制,为了保证宕机后数据不丢失,WAL会采用同步写磁盘的方式,由参数forceSync控制,默认为yes,所以一定要保证dataLogDir目录在非常快的磁盘上,最好是SSD,如果为false,则只是写到了OS的pagecache,操作系统宕机后可能会丢数据。 为了检测写磁盘的性能,可以配置系统属性fsync.warningthresholdms(3.3.4引入)=20,如果数据固化到磁盘的操作fsync超过20ms的时候,将会在zookeeper.out中输出一条warn日志,默认是1秒。

原文地址:https://www.cnblogs.com/zhjh256/p/13693142.html