利用redis制作消息队列

redis在游戏服务器中的使用初探(一) 环境搭建
redis在游戏服务器中的使用初探(二) 客户端开源库选择
redis在游戏服务器中的使用初探(三) 信息存储
redis在游戏服务器中的使用初探(四) redis应用

在学习分布式对象存储的期间,有这么一个需求

"多个接口服务(本文当作客户端Clinet)需要以固定间隔向所有的多个服务器发送心跳,保证服务器确认客户端状态。

服务器在接收到文件读取请求时候,会广播询问所有数据服务器(本文也当作服务器)存储的数据情况"

以上一对多  的询问,是需要消息队列来进行通讯的

但是其实 redis也可以作为轻量级的消息队列来完成这个需求。

结构图

服务器开启一个线程进行redis订阅模式,当有人在指定频道发布消息时,所有订阅该频道的节点都可以接收到消息。

但是订阅操作如果我们不想采取固定时间间隔去获取频道是否有消息这么LOW的方案,其实是需要做成异步模式的。

而windows下 hredis异步模式是需要libevent支持的。 两者都是linux下运行良好的开源库,在windows下却是问题多多。

经过多次尝试,我决定放弃使用这两个开源库而选择cpp-redis。(linux下使用hredis和libevent ,有时间会试试)

流程如下:

一个服务节点 需要开启一个线程 进行客户端消息队列的订阅,每当收到消息就会调用收到消息的回调函数

而最初开启的服务节点的运行线程会定时的在服务器消息队列发布询问数据存储的信息。

客户端节点则相反 开启一个线程 定时向客户端消息队列发布心跳信息。

最初开启的客户端节点进行服务器消息队列的订阅,若收到服务器的数据存储询问,则进行本身是否存储该数据的判断

由于资源有限,最开始我们开启了5个线程 来模拟 2个服务器和 3个客户端

代码如下

  1 #include <iostream>
  2 #include <Winsock2.h>
  3 #include <thread>
  4 #include <mutex>
  5 
  6 #include "cpp_redis/cpp_redis"
  7 #include "tacopie/tacopie"
  8 
  9 using namespace std;
 10 
 11 const int serverThreadNum = 2;
 12 const int clientThreadNum = 3;
 13 const int heartBeatTime = 1;
 14 const int ServerQueryTime = 1;
 15 const std::string clientChanName = "ClientChan";
 16 const std::string serverChanName = "ServerChan";
 17 std::mutex g_mutex;
 18 
 19 class WinsockGuard {
 20 public:
 21     WinsockGuard() {
 22         WORD version = MAKEWORD(2, 2);
 23         if (WSAStartup(version, &data) != 0) {
 24             std::cerr << "WSAStartup() failure!" << std::endl;
 25             return;
 26         }
 27     }
 28 
 29     ~WinsockGuard() {
 30         WSACleanup();
 31     }
 32 private:
 33     WSADATA data;
 34 };
 35 
 36 bool SubcribCommFunc(int threadNum,bool isServer) {
 37     cpp_redis::subscriber sub;
 38 
 39     try {
 40         sub.connect("127.0.0.1", 6379, [](const std::string& host, std::size_t port, cpp_redis::subscriber::connect_state status) {
 41             if (status == cpp_redis::subscriber::connect_state::dropped) {
 42                 {std::lock_guard<std::mutex> l(g_mutex); std::cout << "client disconnected from " << host << ":" << port << std::endl; }
 43                 //should_exit.notify_all();
 44             }
 45         });
 46 
 47     }
 48     catch (std::exception& e) {
 49         {std::lock_guard<std::mutex> l(g_mutex); std::cerr << "in " << __FUNCTION__ << ".err = " << e.what() << std::endl; }
 50         return false;
 51     }
 52     std::string chanName;
 53     if (isServer) {chanName = clientChanName;}
 54     else {chanName = serverChanName;}
 55 
 56     sub.subscribe(chanName.c_str(), [threadNum, isServer](const std::string& chan, const std::string& msg) {
 57         string s;
 58         if (isServer)s = "server ";
 59         else s = "client ";
 60         s += to_string(threadNum);s += " recv ";
 61         {std::lock_guard<std::mutex> l(g_mutex); std::cout << s.c_str() << chan << ": " << msg << std::endl; }
 62         //todo Check heatbeat or response
 63     });
 64     sub.commit();
 65 
 66     while (1) {
 67         std::this_thread::sleep_for(std::chrono::seconds(50000));
 68     }
 69 
 70     return true;
 71 }
 72 
 73 bool RecvClientInfo(int i) {
 74     return SubcribCommFunc(i,true);
 75 }
 76 
 77 bool PublishCommFunc(int threadNum, bool isServer, string publishStr) {
 78     cpp_redis::client client;
 79     try {
 80         client.connect("127.0.0.1", 6379, [threadNum, isServer,&publishStr](const std::string& host, std::size_t port, cpp_redis::client::connect_state status) {
 81             if (status == cpp_redis::client::connect_state::dropped) {
 82                 {std::lock_guard<std::mutex> l(g_mutex);  std::cout << "disconnected from " << host << ":" << port << std::endl; }
 83             }
 84         });
 85         while (1) {
 86             std::string chanName;
 87             if (isServer) {chanName = serverChanName;}
 88             else {    chanName = clientChanName;}
 89 
 90             client.publish(chanName.c_str(), publishStr.c_str());
 91             client.commit();
 92 
 93             int PubliLoopTime = 9;
 94             if (isServer) {PubliLoopTime = ServerQueryTime;}
 95             else {PubliLoopTime = heartBeatTime;}
 96 
 97             std::this_thread::sleep_for(std::chrono::seconds(PubliLoopTime));
 98         }
 99     }
100     catch (std::exception& e) {
101         {std::lock_guard<std::mutex> l(g_mutex); std::cerr << "in " << __FUNCTION__ << ".err = " << e.what() << std::endl; }
102         return false;
103     }
104 
105     return true;
106 }
107 
108 void QueryWhoSaveDataLoop(int i) {
109     string s = "Server thread ";s += to_string(i);s += " query Who save data? ";
110     PublishCommFunc(i, true, s);
111     return;
112 }
113 
114 void ServerFunc(int i) {
115     {std::lock_guard<std::mutex> l(g_mutex);std::cout << "Enter ServerFunc threadNo = " << i << std::endl;}
116     //开启一个订阅客户端消息队列的线程 接受客户端的心跳包
117     thread t = thread(RecvClientInfo, i);
118     t.detach();
119 
120     //开启一个定时检测心跳超时的客户端 todo
121 
122     //本线程不定时随机 发送一个询问各个客户端是否保存有数据
123     QueryWhoSaveDataLoop(i);
124 
125     std::this_thread::sleep_for(std::chrono::seconds(500));
126 }
127 
128 void SendHeatBeatOnTime(int threadNum, int sendTime) {
129     string s = "client thread ";s += to_string(threadNum);s += " send heartbeat";
130     PublishCommFunc(threadNum, false, s);
131 }
132 
133 void ClientFunc(int i) {
134     {std::lock_guard<std::mutex> l(g_mutex);std::cout << "Enter ClientFunc threadNo = " << i << std::endl;}
135 
136     //开启一个线程 定时发送心跳包
137     int s = heartBeatTime;
138     std::thread t = thread(SendHeatBeatOnTime, i, s);
139     t.detach();
140 
141     SubcribCommFunc(i, false);
142 }
143 
144 void Start() {
145     thread serverThread[serverThreadNum];
146     thread clientThread[clientThreadNum];
147 
148     for (int i = 0; i < serverThreadNum; i++) {
149         serverThread[i] = thread(ServerFunc, i);
150     }
151     for (int i = 0; i < clientThreadNum; i++) {
152         clientThread[i] = thread(ClientFunc, i);
153     }
154     //==================================================
155     for (int i = 0; i < serverThreadNum; i++) {
156         serverThread[i].join();
157     }
158     for (int i = 0; i < clientThreadNum; i++) {
159         clientThread[i].join();
160     }
161 }
162 
163 int main()
164 {
165     WinsockGuard g;
166     Start();
167     std::cout << "Finish!
";
168 }
View Code

开启redis  运行代码如图

番外: 补上我在ubuntu下进行的libevent + hiredis的异步测试

首先是安装源头更新 更新 gcc  g++  make 等工具

sudo apt-get update

sudo apt-get install g++ gcc

安装 redis server

sudo apt-get install redis-server 

现在可以通过下面的命令查看到该进程:
ps -ef|grep redis

然后安装 hiredis 和 libevent

sudo apt-get install libhiredis-dev

sudo apt-get install libevent-dev

安装完成验证下是否正确安装

编写libevent 示例代码

 1 #include <event.h>
 2 #include <stdio.h>
 3 
 4 struct event ev;
 5 struct timeval tv;
 6 
 7 
 8 void time_cb(int fd, short event, void *argc)
 9 {
10     printf(  "timer wakeup
"); 
11     event_add(&ev, &tv);
12 }
13 
14 int main()
15 {
16     struct event_base *base = event_init();
17 
18     tv.tv_sec = 1;
19     tv.tv_usec = 0;
20     evtimer_set(&ev, time_cb, NULL);
21     event_add(&ev, &tv);
22     event_base_dispatch(base);
23 
24     return 0;
25 }
libeventTest.c

执行编译命令并运行 gcc -o eventexe libeventTest.c  -levent

./eventexe  执行无错误则验证通过

编写hiredis示例代码

 1 #include <stdio.h> 
 2 #include <hiredis/hiredis.h> 
 3 int main() 
 4 { 
 5      redisContext *conn  = redisConnect("127.0.0.1",6379); 
 6      if(conn != NULL && conn->err) 
 7      {   
 8          printf("connection error: %s
",conn->errstr); 
 9          return 0; 
10      }   
11      redisReply *reply = (redisReply*)redisCommand(conn,"set foo 1234"); 
12      freeReplyObject(reply); 
13              
14      reply = redisCommand(conn,"get foo"); 
15      printf("%s
",reply->str); 
16      freeReplyObject(reply); 
17              
18      redisFree(conn); 
19      return 0; 
20 }
View Code

执行编译命令并运行 gcc -o hiredisCli hiredisTest.c  -lhiredis

./hiredisCli  执行无错误则验证通过

libevent和hiredis都确认无误后 开始测试异步代码

编写异步示例代码

 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <string.h>
 4 #include <signal.h>
 5 
 6 #include <hiredis/hiredis.h> 
 7 #include <hiredis/async.h>
 8 #include <hiredis/adapters/libevent.h>
 9 
10 
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <signal.h>
15 
16 void getCallback(redisAsyncContext *c, void *r, void *privdata) {
17     redisReply *reply = r;
18     if (reply == NULL) return;
19     printf("argv[%s]: %s
", (char*)privdata, reply->str);
20 
21     /* Disconnect after receiving the reply to GET */
22     redisAsyncDisconnect(c);
23 }
24 
25 void connectCallback(const redisAsyncContext *c, int status) {
26     if (status != REDIS_OK) {
27         printf("Error: %s
", c->errstr);
28         return;
29     }
30     printf("Connected...
");
31 }
32 
33 void disconnectCallback(const redisAsyncContext *c, int status) {
34     if (status != REDIS_OK) {
35         printf("Error: %s
", c->errstr);
36         return;
37     }
38     printf("Disconnected...
");
39 }
40 
41 int main (int argc, char **argv) {
42     signal(SIGPIPE, SIG_IGN);
43     struct event_base *base = event_base_new();
44 
45     redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
46     if (c->err) {
47         /* Let *c leak for now... */
48         printf("Error: %s
", c->errstr);
49         return 1;
50     }
51 
52     redisLibeventAttach(c,base);
53     redisAsyncSetConnectCallback(c,connectCallback);
54     redisAsyncSetDisconnectCallback(c,disconnectCallback);
55     redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
56     redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
57     event_base_dispatch(base);
58     return 0;
59 }
View Code

执行编译命令并运行  

gcc -o async async.c -lhiredis -levent

./async

测试成功

作 者: itdef
欢迎转帖 请保持文本完整并注明出处
技术博客 http://www.cnblogs.com/itdef/
B站算法视频题解
https://space.bilibili.com/18508846
qq 151435887
gitee https://gitee.com/def/
欢迎c c++ 算法爱好者 windows驱动爱好者 服务器程序员沟通交流
如果觉得不错,欢迎点赞,你的鼓励就是我的动力
阿里打赏 微信打赏
原文地址:https://www.cnblogs.com/itdef/p/9609270.html