zookeeper:master选举

  • 模拟选举机器类
 1 package com.karat.cn.zookeeperAchieveLock.zkclient;
 2 
 3 import java.io.Serializable;
 4 
 5 /**
 6  * 选举的机器
 7  */
 8 public class UserCenter implements Serializable{
 9 
10     private static final long serialVersionUID = -1776114173857775665L;
11     private int id; //机器信息
12 
13     private String name;//机器名称
14 
15     public int getId() {
16         return id;
17     }
18 
19     public void setId(int id) {
20         this.id = id;
21     }
22 
23     public String getName() {
24         return name;
25     }
26 
27     public void setName(String name) {
28         this.name = name;
29     }
30 
31     @Override
32     public String toString() {
33         return "UserCenter [id=" + id + ", name=" + name + "]";
34     }
35  
36 }
View Code
  • 选举服务
  1 package com.karat.cn.zookeeperAchieveLock.zkclient;
  2 
  3 import org.I0Itec.zkclient.IZkDataListener;
  4 import org.I0Itec.zkclient.ZkClient;
  5 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
  6 
  7 import java.util.concurrent.Executors;
  8 import java.util.concurrent.ScheduledExecutorService;
  9 import java.util.concurrent.TimeUnit;
 10 
 11 /**
 12  * 选举的服务
 13  */
 14 public class MasterSelector {
 15 
 16     private ZkClient zkClient;
 17 
 18     private final static String MASTER_PATH="/master"; //需要争抢的节点
 19 
 20     private IZkDataListener dataListener; //注册节点内容变化
 21 
 22     private UserCenter server;  //其他服务器
 23 
 24     private UserCenter master;  //master节点
 25 
 26     private boolean isRunning=false;
 27 
 28     ScheduledExecutorService scheduledExecutorService= Executors.newScheduledThreadPool(1);//定时任务
 29 
 30     public MasterSelector(UserCenter server,ZkClient zkClient) {
 31         System.out.println("["+server+"] 去争抢master权限");
 32         this.server = server;
 33         this.zkClient=zkClient;
 34 
 35         this.dataListener= new IZkDataListener() {
 36             @Override
 37             public void handleDataChange(String s, Object o) throws Exception {
 38 
 39             }
 40 
 41             @Override
 42             public void handleDataDeleted(String s) throws Exception {
 43                 //节点如果被删除, 发起选主操作
 44                 chooseMaster();
 45             }
 46         };
 47     }
 48 
 49     public void start(){
 50         //开始选举
 51         if(!isRunning){
 52             isRunning=true;
 53             zkClient.subscribeDataChanges(MASTER_PATH,dataListener); //注册节点事件
 54             chooseMaster();
 55         }
 56     }
 57 
 58 
 59     public void stop(){
 60         //停止
 61         if(isRunning){
 62             isRunning=false;
 63             scheduledExecutorService.shutdown();
 64             zkClient.unsubscribeDataChanges(MASTER_PATH,dataListener);
 65             releaseMaster();
 66         }
 67     }
 68 
 69 
 70     //具体选master的实现逻辑
 71     private void chooseMaster(){
 72         if(!isRunning){
 73             System.out.println("当前服务没有启动");
 74             return ;
 75         }
 76         try {
 77             zkClient.createEphemeral(MASTER_PATH, server);//创建一个临时节点
 78             master=server; //把server节点赋值给master
 79             System.out.println(master+"->我现在已经是master,你们要听我的");
 80 
 81             //定时器
 82             //master释放(master 出现故障),没2秒钟释放一次
 83             scheduledExecutorService.schedule(()->{
 84                 releaseMaster();//释放锁
 85             },2, TimeUnit.SECONDS);
 86         }catch (ZkNodeExistsException e){
 87             //创建一个临时节点抛出异常
 88             //表示master已经存在
 89             UserCenter userCenter=zkClient.readData(MASTER_PATH,true);
 90             if(userCenter==null) {
 91                 System.out.println("启动操作:");
 92                 chooseMaster(); //再次获取master
 93             }else{
 94                 master=userCenter;
 95             }
 96         }
 97     }
 98 
 99     private void releaseMaster(){
100         //释放锁(故障模拟过程)
101         //判断当前是不是master,只有master才需要释放
102         if(checkIsMaster()){
103             zkClient.delete(MASTER_PATH); //删除
104         }
105     }
106 
107 
108     private boolean checkIsMaster(){
109         //判断当前的server是不是master
110         UserCenter userCenter=zkClient.readData(MASTER_PATH);
111         if(userCenter.getName().equals(server.getName())){
112             master=userCenter;
113             return true;
114         }
115         return false;
116     }
117 
118 }
View Code
  • 选举测试
 1 package com.karat.cn.zookeeperAchieveLock.zkclient;
 2 
 3 import org.I0Itec.zkclient.ZkClient;
 4 import org.I0Itec.zkclient.serialize.SerializableSerializer;
 5 
 6 import java.io.IOException;
 7 import java.util.ArrayList;
 8 import java.util.List;
 9 import java.util.concurrent.TimeUnit;
10 
11 /**
12  * master选举测试
13  */
14 public class MasterChooseTest {
15 
16     private final static String CONNECTSTRING="47.107.121.215:2181";
17 
18 
19     public static void main(String[] args) throws IOException {
20         List<MasterSelector> selectorLists=new ArrayList<>();
21         try {
22             for(int i=0;i<10;i++) {
23                 ZkClient zkClient = new ZkClient(CONNECTSTRING, 5000,
24                         5000,
25                         new SerializableSerializer());
26                 UserCenter userCenter = new UserCenter();
27                 userCenter.setId(i);
28                 userCenter.setName("客户端:" + i);
29 
30                 MasterSelector selector = new MasterSelector(userCenter,zkClient);
31                 selectorLists.add(selector);
32                 selector.start();//触发选举操作
33                 TimeUnit.SECONDS.sleep(1);
34             }
35         } catch (InterruptedException e) {
36             e.printStackTrace();
37         } finally {
38             for(MasterSelector selector:selectorLists){
39                 selector.stop();
40             }
41         }
42     }
43 }
View Code

通过zookeeper进行master选举,就是利用zookeeper的节点特性,通过是否能够创建临时节点来判断是否选举成功,如果不能创建临时节点,则表明已有线程创建成功,那么创建成功的线程就为选举的master,当网络发生故障或其它问题导致该线程挂掉,那么zookeeper中的该临时节点也会删除,通过监听节点是否有过删除动作,重新选举新的master。

原文地址:https://www.cnblogs.com/LJing21/p/10549851.html