分布式协议学习笔记(一) Raft 选举

Raft官网

论文中文翻译

论文英文地址

感觉作为paxos的升级精简版 Raft在设计之初就以容易理解为目标 看完资料 脑海里都有了大概的轮廓。

有了这些详细的资料甚至是动画演示在前 起始都没多少好说的,本篇知识作为记录下学习点,作为日后回顾提示

在分布式系统中,一致性指的是集群中的多个节点在状态上达成一致.但是在现实场景中,由于程序崩溃、网络故障、硬件故障、断电等原因,节点间的一致性很难保证,这样就需要Paxos、Raft等一致性协议。

Paxos协议是Leslie Lamport在1990年提出的一种基于消息传递的、具有高度容错特性的一致性算法.但是Paxos有两个明显的缺点:第一个缺点就是Paxos算法难以理解.第二个缺点就是并没有提供构建现实系统的良好基础,

有很多工程化Paxos算法的尝试,但是他们对Paxos算法本身做了较大改动,彼此之间的实现差距都比较大

Raft算法是一种用于管理复制日志的一致性算法,在设计Raft算法时设计者就将易于理解作为目标之一,是的Raft算法更易于构建实际的系统,大幅度减少了工程化的工作量。

1 Leader选举

Raft协议的模式是一个Leader节点和多个Follower节点的模式。就是常说的Leader-Follower模式.每个节点有三个状态Leader Follower Candidate状态

Leader负责处理客户端请求 并且将处理结果以log形式同步到其他Follower节点上

在Raft协议中有两个时间控制Leader选举的进度。

一个Leader定时向Follower发送心跳包。

一个是选举超时控制(election timeout),选举超时控制就是一个处于Follower节点等待进入Candidate状态的时间限制。

选举超时控制(election timeout)一般在选择150ms到300ms之间的随机值(概率上避免多个节点同时进入Candidate状态)

若某个节点election timeout进度完成之前都没收到Leader的心跳包,则说明没有Leader,该节点进入Candidate状态.给自己投票,然后给其他节点发送选举请求.

其他节点收到选举请求后,若在当前请求中标记的任期(term)内比自己记录的term相等或者更大,且未进行过投票,则回复答应该投票请求,重置自己的选举超时控制

选举者获取一半以上投票,进入Leader状态,开始给其他节点Follower发送心跳,维持自己的权威

下面来看看多个节点 选择的情况 节点B D同时发起选举投票,并且每个节点都获取一张选票,最后的结果就是随机选举超时时间,选举超时控制(election timeout)一般在选择150ms到300ms之间的随机值(概率上避免多个节点同时进入Candidate状态) 。 

最终,重复多次选举投票后(概率很小),某个节点获取一半以上投票,成为Leader。

 1 #pragma once
 2 #include <iostream>
 3 #include <fstream>
 4 #include <cassert>
 5 #include <string>
 6 #include <iostream>
 7 #include <vector>
 8 #include <map>
 9 using namespace std;
10 /*
11 *作 者: itdef
12 *欢迎转帖 请保持文本完整并注明出处
13 *技术博客 http://www.cnblogs.com/itdef/
14 *技术交流群 群号码:432336863
15 *欢迎c c++ windows驱动爱好者 服务器程序员沟通交流
16 *部分老代码存放地点
17 *http://www.oschina.net/code/list_by_user?id=614253
18 */
19 const string FILE_NAME = "config.txt";
20 class ReadConfig {
21 public:
22     ReadConfig(string filename = "") {
23         if (filename.empty()) {
24             file_name = FILE_NAME;
25         }
26         else {
27             file_name = filename;
28         }
29     }
30     ~ReadConfig() {}
31     map<string, string> Do() {
32         tar_path.clear();
33         ifstream fin;
34         fin.open(file_name);
35         if (false == fin.is_open()) {
36             std::cerr << "open file failed!!" << std::endl;
37             return tar_path;
38         }
39         string s;
40         while (getline(fin, s))
41         {
42             if ('#' == s[0] || ('/' == s[0] && '/' == s[1]))
43                 continue;
44             size_t pos = s.find_first_of("=");
45             if (pos == std::string::npos || pos + 1 >= s.size())
46                 continue;
47             string targetName = s.substr(0, pos);
48             string path = s.substr(pos + 1);
49             std::cout << targetName << " = " << path << std::endl;
50             if (path[0] != ' ')
51                 tar_path[targetName] = path;
52         }
53         fin.close();
54         return tar_path;
55     }
56 private:
57     map<string, string> tar_path;
58     string file_name;
59 };
ReadConfig.h
 1 #pragma once
 2 #pragma once
 3 #include <string>
 4 #include <mutex>
 5 #include <map>
 6 
 7 const enum STATUS {
 8     LEADER_STATUS = 1,
 9     FOLLOWER_STATUS,
10     CANDIDATE_STATUS,
11     PRE_VOTE_STAUS,
12 };
13 
14 const enum INFOTYPE {
15     DEFAULT_TYPE = 0,
16     HEART_BREAT_TYPE,
17     VOTE_LEADER_TYPE,
18     VOTE_LEADER_RESP_TYPE,
19 
20 };
21 
22 typedef struct netInfo {
23     int fromID;
24     int toID;
25     INFOTYPE infotype;
26     int    term;
27     int voteId;    //选举ID infotype为votetype才有效
28 }NetInfo;
29 
30 typedef struct locaInfo {
31     int    id;
32     int leaderID;
33     STATUS status;
34     int term;
35     int isVote;
36     int    IsRecvHeartbeat;
37     std::map<int, int> voteRecord;// id term有此记录表示该term收到该id投取自己一票
38 }LocalInfo;
39 
40 typedef struct localInfoWithLock {
41     LocalInfo    locInfo;
42     std::mutex m;
43 }LocalInfoWithLock;
CommonStruct.h
 1 #pragma once
 2 #pragma once
 3 #include "CommonStruct.h"
 4 #include "ReadConfig.h"
 5 #include <memory>
 6 #include <boost/asio.hpp>
 7 
 8 using boost::asio::ip::tcp;
 9 using namespace std;
10 
11 class RaftManager :public enable_shared_from_this<RaftManager> {
12 public:
13     static std::shared_ptr<RaftManager> GetInstance() {
14         if (p == nullptr)
15             p.reset(new RaftManager());
16         //p = std::make_shared<RaftManager>();
17         return p;
18     }
19     ~RaftManager() {
20         std::cout << "enter ~RaftManager()
";
21     }
22     bool Init();
23     bool Go();
24 
25 private:
26     boost::asio::io_service io_service;
27     std::string ip; int portStart;
28     int nodeID;
29     int electionTimeout;
30     int heartbeatTime;
31     LocalInfoWithLock locInfolock;
32 
33     //===============================send
34     void DiapatchByStatus(int id, int& timeoutLimit);
35     void HandleLeaderSend(int id, int& timeoutLimit);
36     void HandleCandidateSend(int id, int& timeoutLimit);
37     void HandleFollowerSend(int id, int& timeoutLimit);
38     void HandlePreVoteSend(int id, int& timeoutLimit);
39 
40     //===================recv
41     void DiapatchByInfoType(const NetInfo& netinf);
42     void HandleHeartbeatTypeRecv(const NetInfo& netinf);
43     void HandleVoteTypeRecv(const NetInfo& netinf);
44     void HandleVoteRespTypeRecv(const NetInfo& netinf);
45 
46     std::function<int()> dice;
47 
48     bool LoopCheck(int id, std::shared_ptr<tcp::socket> s);
49     void Session(tcp::socket sock);
50     void SendFunc(int id);
51 
52     RaftManager() {}
53     RaftManager(const RaftManager&) = delete;
54     RaftManager& operator=(const RaftManager&) = delete;
55     static std::shared_ptr<RaftManager> p;
56 };
RaftManager.h
  1 #include "RaftManager.h"
  2 #include <random>
  3 #include <functional>
  4 
  5 std::shared_ptr<RaftManager> RaftManager::p = nullptr;
  6 
  7 
  8 
  9 
 10 bool RaftManager::Init() {
 11     //可以使用json 读取配置
 12     ReadConfig cfg("nodeCfg");
 13     map<string, string> kv = cfg.Do();
 14 
 15     if (kv.find("ip") == kv.end() || kv.find("portStart") == kv.end() || kv.find("nodeID") == kv.end()) {
 16         assert(0);
 17         return false;
 18     }
 19     ip = kv["ip"];  portStart = stoi(kv["portStart"]);  nodeID = stoi(kv["nodeID"]);
 20     electionTimeout = 4000;
 21     heartbeatTime = 5000;
 22     if (kv.find("heartbeatTime") != kv.end())
 23         heartbeatTime = stoi(kv["heartbeatTime"]);
 24 
 25     locInfolock.locInfo.id = nodeID; locInfolock.locInfo.leaderID = 0;
 26     locInfolock.locInfo.IsRecvHeartbeat = 0; locInfolock.locInfo.isVote = 0;
 27     locInfolock.locInfo.status = FOLLOWER_STATUS;
 28     locInfolock.locInfo.voteRecord.clear();
 29 
 30     std::random_device rd;
 31     std::default_random_engine engine(rd());
 32     std::uniform_int_distribution<> dis(2001, 5000);
 33     dice = std::bind(dis, engine);
 34 
 35     return true;
 36 }
 37 
 38 void RaftManager::HandleLeaderSend(int id, int& timeoutLimit) {
 39     if (timeoutLimit > 0){
 40         timeoutLimit -= 200;
 41     }
 42     if (timeoutLimit <= 0) {
 43     
 44         
 45 
 46         timeoutLimit = dice();
 47     }
 48 }
 49 void RaftManager::HandleCandidateSend(int id, int& timeoutLimit) {
 50     if (timeoutLimit > 0) {
 51         timeoutLimit -= 200;
 52     }
 53     if (timeoutLimit <= 0) {
 54 
 55 
 56 
 57         timeoutLimit = dice();
 58     }
 59 
 60 }
 61 
 62 
 63 void RaftManager::HandlePreVoteSend(int id, int& timeoutLimit) {
 64     if (timeoutLimit > 0) {
 65         timeoutLimit -= 200;
 66     }
 67     if (timeoutLimit <= 0) {
 68 
 69 
 70 
 71         timeoutLimit = dice();
 72     }
 73 
 74 }
 75 
 76 void RaftManager::HandleFollowerSend(int id, int& timeoutLimit) {
 77     if (timeoutLimit > 0) {
 78         timeoutLimit -= 200;
 79     }
 80     if (timeoutLimit <= 0) {
 81         LocalInfo localInfo;
 82         //加锁获取当前状态 决定是否进行发送操作
 83         {
 84             //加锁获取本地当前状态
 85             std::lock_guard<std::mutex> lck(locInfolock.m);
 86             localInfo = locInfolock.locInfo;
 87         }
 88         if (localInfo.IsRecvHeartbeat == 0) {
 89             //心跳超时  切换到选举模式
 90             std::lock_guard<std::mutex> lck(locInfolock.m);
 91             locInfolock.locInfo.term++;
 92             locInfolock.locInfo.status = CANDIDATE_STATUS;
 93             locInfolock.locInfo.voteRecord.clear();
 94             locInfolock.locInfo.voteRecord[nodeID] = locInfolock.locInfo.term;
 95         }
 96 
 97         timeoutLimit = dice();
 98     }
 99 }
100 
101 //===================
102 void RaftManager::HandleHeartbeatTypeRecv(const NetInfo& netinf) {
103     std::lock_guard<std::mutex> lck(locInfolock.m);
104     if (netinf.fromID != locInfolock.locInfo.leaderID)
105         locInfolock.locInfo.leaderID = netinf.fromID;
106     locInfolock.locInfo.IsRecvHeartbeat = 1;
107 
108 }
109 void RaftManager::HandleVoteTypeRecv(const NetInfo& netinf) {
110     std::lock_guard<std::mutex> lck(locInfolock.m);
111     int voteid = netinf.fromID;
112     if (locInfolock.locInfo.isVote == 0) {
113         //回复投票 todo
114 
115         locInfolock.locInfo.isVote = 1;    //标记该term已经投票
116     }
117     else {
118         //回复不投票 todo
119     }
120 
121 }
122 void RaftManager::HandleVoteRespTypeRecv(const NetInfo& netinf) {
123     std::lock_guard<std::mutex> lck(locInfolock.m);
124     if (netinf.infotype == VOTE_LEADER_RESP_TYPE && netinf.toID == nodeID) {
125         //更新本地map记录
126         locInfolock.locInfo.voteRecord[netinf.fromID] = netinf.term;
127     }
128     int count = 0;
129     std::map<int, int>::iterator it = locInfolock.locInfo.voteRecord.begin();
130     //查看本term的投票是否达半数以上
131     while (it != locInfolock.locInfo.voteRecord.end()) {
132         if (it->second == locInfolock.locInfo.term)
133             count++;
134         it++;
135     }
136     if (count > 5 / 2) {
137         //达到半数以上 转化为leader模式 否则继续选举
138         locInfolock.locInfo.leaderID = nodeID;
139         locInfolock.locInfo.IsRecvHeartbeat = 0;
140         locInfolock.locInfo.status = LEADER_STATUS;
141     }
142 }
143 
144 
145 //loop send
146 void RaftManager::DiapatchByStatus(int id,int& timeoutLimit) {
147     NetInfo netinf{ nodeID,id,DEFAULT_TYPE,0,0 };
148     LocalInfo localInfo;
149     //加锁获取当前状态 决定是否进行发送操作
150     {
151         //加锁获取本地当前状态
152         std::lock_guard<std::mutex> lck(locInfolock.m);
153         localInfo = locInfolock.locInfo;
154     }
155     switch (localInfo.status) {
156     case LEADER_STATUS:
157         HandleLeaderSend(id,timeoutLimit);
158         break;
159     case FOLLOWER_STATUS:
160         HandleFollowerSend(id,timeoutLimit);
161         break;
162     case CANDIDATE_STATUS:
163         HandleCandidateSend(id,timeoutLimit);
164         break;
165     case PRE_VOTE_STAUS:
166         HandlePreVoteSend(id, timeoutLimit);
167     default:
168         std::cerr << "unknown status!!" << std::endl;
169     }
170 
171 }
172 
173 
174 //handle recv
175 void RaftManager::DiapatchByInfoType(const NetInfo& netinf) {
176     {
177         std::lock_guard<std::mutex> lck(locInfolock.m);
178         if (netinf.term < locInfolock.locInfo.term)
179             return;
180         if (netinf.term > locInfolock.locInfo.term) {
181             locInfolock.locInfo.term = netinf.term;
182             locInfolock.locInfo.status = FOLLOWER_STATUS;
183             locInfolock.locInfo.isVote = 0;
184             locInfolock.locInfo.IsRecvHeartbeat = 0;
185             locInfolock.locInfo.voteRecord.clear();
186         }
187     }
188     //========================================
189     switch (netinf.infotype) {
190     case HEART_BREAT_TYPE:
191         HandleHeartbeatTypeRecv(netinf);
192         break;
193     case VOTE_LEADER_TYPE:
194         HandleVoteTypeRecv(netinf);
195         break;
196     case VOTE_LEADER_RESP_TYPE:
197         HandleVoteRespTypeRecv(netinf);
198         break;
199     default:
200         std::cerr << "Recv Unknown info type." << std::endl;
201     }
202 
203 }
204 
205 bool RaftManager::LoopCheck(int id, std::shared_ptr<tcp::socket> s) {
206     int looptime = 200;
207     int timeoutlimit = dice();
208     while (1) {
209         DiapatchByStatus(id, timeoutlimit);
210         std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
211     }
212 
213     return false;
214 }
215 
216 void RaftManager::SendFunc(int i) {
217     //todo
218     //示例 间隔200ms扫描 心跳间隔5000ms  选举超时未 1001-4000ms
219     string port = "9920";
220     port[port.size() - 1] += i;
221     int looptime = 4000;
222     while (1) {
223         std::shared_ptr<tcp::socket> s = std::make_shared<tcp::socket>((io_service));
224         tcp::resolver resolver(io_service);
225         try {
226             boost::asio::connect(*s, resolver.resolve({ "127.0.0.1", port }));
227         }
228         catch (exception& e) {
229             //持续尝试连接
230             continue;
231         }
232         LoopCheck(i, s);
233         std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
234     }
235 
236     return;
237 }
238 
239 void RaftManager::Session(tcp::socket sock) {
240     BYTE data[1024] = { 0 };
241     boost::system::error_code error;
242     NetInfo netinf;
243     while (1) {
244         size_t length = sock.read_some(boost::asio::buffer(&netinf, sizeof(netinf)), error);
245         if (error == boost::asio::error::eof)
246             return; // Connection closed cleanly by peer.
247         else if (error) {
248             std::cerr << boost::system::system_error(error).what() << std::endl;// Some other error.
249             return;
250         }
251         if (length != sizeof(netinf)) {
252             std::cerr << __FUNCTION__ << " recv wrong lenth:" << length << std::endl;// Some other error.
253             return;
254         }
255 
256         DiapatchByInfoType(netinf);
257 
258     }
259 }
260 
261 bool RaftManager::Go() {
262     //建立网络 本来可以使用广播  获取和通知其他节点
263     //演示版本假定 5个ID和端口分别为1 2 3 4 5 和9921 9922 9923 9924 9925
264     if (ip == "" || portStart == 0 || nodeID == 0)
265         return false;
266     try {
267         //开启4个与其他线程发送信息的线程
268         for (int i = 1; i <= 2; i++) {
269             if (i == nodeID)
270                 continue;
271             std::thread t = std::thread(&RaftManager::SendFunc, shared_from_this(), i);
272             t.detach();
273         }
274 
275         int port = portStart + nodeID;
276         tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port));
277         for (;;)
278         {
279             for (;;)
280             {
281                 tcp::socket sock(io_service);
282                 a.accept(sock);
283                 std::thread(&RaftManager::Session, shared_from_this(), std::move(sock)).detach();
284             }
285         }
286     }
287     catch (exception& e) {
288         std::cerr << __FUNCTION__ << " : " << e.what() << std::endl;
289         return false;
290     }
291 
292     return true;
293 }
RaftManager.cpp
  1 // QueueTemplate.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
  2 //
  3 
  4 #include "pch.h"
  5 #include <iostream>
  6 
  7 
  8 #include<list>
  9 #include<mutex>
 10 #include<thread>
 11 #include<condition_variable>
 12 #include <iostream>
 13 using namespace std;
 14 
 15 template<typename T>
 16 class SyncQueue
 17 {
 18 public:
 19     SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
 20     {
 21     }
 22 
 23     void Put(const T&x)
 24     {
 25         Add(x);
 26     }
 27 
 28     void Put(T&&x)
 29     {
 30         Add(std::forward<T>(x));
 31     }
 32 
 33     void Take(std::list<T>& list)
 34     {
 35         std::unique_lock<std::mutex> locker(m_mutex);
 36         m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
 37 
 38         if (m_needStop)
 39             return;
 40         list = std::move(m_queue);
 41         m_notFull.notify_one();
 42     }
 43 
 44     void Take(T& t)
 45     {
 46         std::unique_lock<std::mutex> locker(m_mutex);
 47         m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
 48 
 49         if (m_needStop)
 50             return;
 51         t = m_queue.front();
 52         m_queue.pop_front();
 53         m_notFull.notify_one();
 54     }
 55 
 56     void Stop()
 57     {
 58         {
 59             std::lock_guard<std::mutex> locker(m_mutex);
 60             m_needStop = true;
 61         }
 62         m_notFull.notify_all();
 63         m_notEmpty.notify_all();
 64     }
 65 
 66     bool Empty()
 67     {
 68         std::lock_guard<std::mutex> locker(m_mutex);
 69         return m_queue.empty();
 70     }
 71 
 72     bool Full()
 73     {
 74         std::lock_guard<std::mutex> locker(m_mutex);
 75         return m_queue.size() == m_maxSize;
 76     }
 77 
 78     size_t Size()
 79     {
 80         std::lock_guard<std::mutex> locker(m_mutex);
 81         return m_queue.size();
 82     }
 83 
 84     int Count()
 85     {
 86         return m_queue.size();
 87     }
 88 private:
 89     bool NotFull() const
 90     {
 91         bool full = m_queue.size() >= m_maxSize;
 92         if (full)
 93             cout << "full, waiting,thread id: " << this_thread::get_id() << endl;
 94         return !full;
 95     }
 96 
 97     bool NotEmpty() const
 98     {
 99         bool empty = m_queue.empty();
100         if (empty)
101             cout << "empty,waiting,thread id: " << this_thread::get_id() << endl;
102         return !empty;
103     }
104 
105     template<typename F>
106     void Add(F&&x)
107     {
108         std::unique_lock< std::mutex> locker(m_mutex);
109         m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
110         if (m_needStop)
111             return;
112 
113         m_queue.push_back(std::forward<F>(x));
114         m_notEmpty.notify_one();
115     }
116 
117 private:
118     std::list<T> m_queue; //缓冲区
119     std::mutex m_mutex; //互斥量和条件变量结合起来使用
120     std::condition_variable m_notEmpty;//不为空的条件变量
121     std::condition_variable m_notFull; //没有满的条件变量
122     int m_maxSize; //同步队列最大的size
123 
124     bool m_needStop; //停止的标志
125 };
126 
127 int main()
128 {
129     std::cout << "Hello World!
"; 
130 
131     SyncQueue<int> q(1);
132     q.Put(1);
133 
134     int a = 0;
135     q.Take(a);
136     
137     q.Put(2);
138     q.Take(a);
139 
140     
141     q.Stop();
142 
143 }
syncqueue.h

自己尝试做一个简化的raft选举演示

实现定义2-5个节点,使用读取配置文件来获取IP和端口以及节点ID

网络使用boost同步流程

一个线程收 四个线程发送

1 收的线程根据接受的数据 判断是心跳包还是选举请求还是选举请求回复  来更新自己的时间逻辑编号term 更新是否投票isVote 和最新term中那些节点投了自己的选举票map<int,int> // nodeid, term

2 发送的节点每个200MS则轮询一次,根据结点当前状态减少等待时间(等待时间根据节点状态调节为1000ms心跳间隔或者1500-5000的随机选举超时)

根据当前状态决定发送心跳包或者是选举消息 或者是选举回复消息

 待填坑

参考:

《etcd技术内幕》

http://thesecretlivesofdata.com/raft/#intro

作 者: itdef
欢迎转帖 请保持文本完整并注明出处
技术博客 http://www.cnblogs.com/itdef/
B站算法视频题解
https://space.bilibili.com/18508846
qq 151435887
gitee https://gitee.com/def/
欢迎c c++ 算法爱好者 windows驱动爱好者 服务器程序员沟通交流
如果觉得不错,欢迎点赞,你的鼓励就是我的动力
阿里打赏 微信打赏
原文地址:https://www.cnblogs.com/itdef/p/9632960.html