Zookeeper 自增序列实践

1、增加依赖

        <!--        curator ZK客户端-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.10.0</version>
        </dependency>

  

2、定义生成sequence类型

/**
 * 序列类型
 */
public enum ZkSequenceEnum {
    FIRST,
    SECOND,
    THIRD
}

  

3、序列封装

/**
 * 通过分布式原子自增类(DistributedAtomicLong)实现,
 * 注意每5000毫秒重试5次后仍然生成失败则返回null,由上层处理
 */
public class ZkSequence {

    
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000,5);

    DistributedAtomicLong distributedAtomicLong;

    public ZkSequence(String sequenceName, CuratorFramework client ){
        distributedAtomicLong = new DistributedAtomicLong( client,sequenceName, retryPolicy);
    }
    
    /**
     * 生成序列
     * @return
     */
    public  Long sequence() throws  Exception{
        AtomicValue<Long> sequence = this.distributedAtomicLong.increment();
        if(sequence.succeeded()){
            return  sequence.postValue();
        }else {
            return  null;
        }
    }

}

4、配置文件

@Configuration
@ConfigurationProperties(prefix = "zk")
@PropertySource("classpath:zookeeper.properties")
public class ZkConfig {
    String host = "118.xx.xx.101";
    String sequencePath = "/new/sequence/";


    @Bean
    public ZookeeperClient zookeeperClient() {
        return new ZookeeperClient(this.host, this.sequencePath);
    }
}

  

5、客户端封装

public class ZookeeperClient {

    private String host;
    private String sequencePath;
    // 重试休眠时间
    private final int SLEEP_TIME_MS = 1000;
    // 最大重试1000次
    private final int MAX_RETRIES = 1000;
    //会话超时时间
    private final int SESSION_TIMEOUT = 30 * 1000;
    //连接超时时间
    private final int CONNECTION_TIMEOUT = 3 * 1000;
    //创建连接实例
    private CuratorFramework client = null;
    // 序列化集合
    private Map<String, ZkSequence> zkSequence = Maps.newConcurrentMap();

    public ZookeeperClient(String host, String sequencePath) {
        this.host = host;
        this.sequencePath = sequencePath;
    }


    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    @PostConstruct
    public void init() throws Exception {
        this.client = CuratorFrameworkFactory.builder()
                .connectString(this.getHost())
                .connectionTimeoutMs(CONNECTION_TIMEOUT)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES))
                .build();
        this.client.start();
        this.initZkSequence();
    }

    public void initZkSequence() {
        ZkSequenceEnum[] list = ZkSequenceEnum.values();
        for (int i = 0; i < list.length; i++) {
            String name = list[i].name();
            String path = this.sequencePath + name;
            ZkSequence seq = new ZkSequence(path, this.client);
            zkSequence.put(name, seq);
        }
    }

    /*** 生成SEQ  */
    public Long sequence(ZkSequenceEnum name) {
        Long result = null;
        try {
            ZkSequence seq = zkSequence.get(name.name());
            if (seq != null) {
                result = seq.sequence();
            }

        } catch (Exception e) {
            System.out.println("获取" + name + "Sequence错误: " + e.getMessage());
        }
        return result;
    }

}

  

5、调用客户端

@Component
public class Sequences {

    @Autowired
    private ZookeeperClient client;

    public Long sequenceFist(){
        return this.client.sequence(ZkSequenceEnum.FIRST);
    }

    public Long sequenceSecond(){
        return this.client.sequence(ZkSequenceEnum.SECOND);
    }

    public Long sequenceThird(){
        return this.client.sequence(ZkSequenceEnum.THIRD);
    }

 }

  

6、测试

@SpringBootTest
class ZookeepdemoApplicationTests {


    // 第一步,注入Sequences
    @Autowired
    private Sequences sequences;


    @Test
    void sequenceApCollection() throws Exception {


        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));

        for (int i = 0; i < 10; i++) {
            MyTask myTask = new MyTask(sequences);
            executor.execute(myTask);
        }
        Thread.sleep(100000);
        executor.shutdown();


    }

    private class MyTask implements Runnable {

        private Sequences sequences;

        public MyTask(Sequences sequences) {
            this.sequences = sequences;
        }

        @Override
        public void run() {
            // 第二步,在方法中调用生成
            Long num = sequences.sequenceFist();
            System.out.println("num=" + num);

        }
    }

} 

分别启动两个测试工程

作者:Work Hard Work Smart
出处:http://www.cnblogs.com/linlf03/
欢迎任何形式的转载,未经作者同意,请保留此段声明!

原文地址:https://www.cnblogs.com/linlf03/p/15487327.html