xRedis API  1.5.0
The C++ Client API for Redis server
 All Classes
xRedisClusterManager.h
1 
2 #ifndef _XREDIS_CLUSTER_MANAGER_H_
3 #define _XREDIS_CLUSTER_MANAGER_H_
4 
5 #include "hiredis.h"
6 #include "xLock.h"
7 #include "xRedisLog.h"
8 
9 #include <stdint.h>
10 #include <stdlib.h>
11 #include <string.h>
12 
13 #include <string>
14 #include <vector>
15 #include <list>
16 
17 #ifdef _WIN32
18 #define strcasecmp stricmp
19 #define strncasecmp strnicmp
20 #define usleep(us) Sleep((us)/1000)
21 #define pthread_self() GetCurrentThreadId()
22 #endif
23 
24 namespace xrc {
25 
26 #define MAX_REDIS_POOLSIZE 128
27 #define MAX_TIME_OUT 5
28 
29 typedef std::vector<std::string> VSTRING;
30 
31 class RedisResult;
32 typedef struct _REDISCONN_ {
33  _REDISCONN_()
34  {
35  mCtx = NULL;
36  mPort = 0;
37  mPoolSize = 0;
38  }
39  ~_REDISCONN_() { }
40 
41  redisContext* connectWithTimeout()
42  {
43  struct timeval timeoutVal;
44  timeoutVal.tv_sec = MAX_TIME_OUT;
45  timeoutVal.tv_usec = 0;
46 
47  redisContext* ctx = NULL;
48  ctx = redisConnectWithTimeout(mHost.c_str(), mPort, timeoutVal);
49  if (NULL == ctx || ctx->err) {
50  if (NULL != ctx) {
51  redisFree(ctx);
52  ctx = NULL;
53  } else {
54  }
55  }
56 
57  return ctx;
58  }
59  bool ping()
60  {
61  redisReply* reply = static_cast<redisReply*>(redisCommand(mCtx, "PING"));
62  bool bRet = (NULL != reply) && (reply->str) && (strcasecmp(reply->str, "PONG") == 0);
63  if (bRet) {
64  freeReplyObject(reply);
65  }
66  return bRet;
67  }
68  bool redisReConnect()
69  {
70  bool bRet = false;
71  redisContext* tmp_ctx = connectWithTimeout();
72  if (NULL == tmp_ctx) {
73  bRet = false;
74  } else {
75  if (NULL != mCtx) {
76  redisFree(mCtx);
77  }
78  mCtx = tmp_ctx;
79  bRet = auth();
80  }
81  return bRet;
82  }
83 
84  bool auth()
85  {
86  bool bRet = false;
87  if (0 == mPass.length()) {
88  bRet = true;
89  } else {
90  redisReply* reply = static_cast<redisReply*>(
91  redisCommand(mCtx, "AUTH %s", mPass.c_str()));
92  if ((NULL == reply) || (strcasecmp(reply->str, "OK") != 0)) {
93  bRet = false;
94  } else {
95  bRet = true;
96  }
97  freeReplyObject(reply);
98  }
99  xredis_debug("auth %s:%u %s %d", mHost.c_str(), mPort, mPass.c_str(), bRet);
100  return bRet;
101  }
102 
103  redisContext* mCtx;
104  std::string mHost;
105  std::string mPass;
106  uint32_t mPort;
107  uint32_t mPoolSize;
108  uint32_t mIndex;
110 
111 typedef std::list<RedisConnection*> RedisConnectionList;
112 typedef std::list<RedisConnection*>::iterator RedisConnectionIter;
113 
114 struct NodeInfo {
115  std::string strinfo;
116  std::string id;
117  std::string ip; // The node IP
118  uint16_t port; // The node port
119  std::string flags; // A list of comma separated flags: myself, master, slave, fail?, fail, handshake, noaddr, noflags
120  bool is_fail;
121  bool is_master; // true if node is master, false if node is salve
122  bool is_slave;
123  std::string master_id; // The replication master
124  int32_t ping_sent; // Milliseconds unix time at which the currently active ping was sent, or zero if there are no pending pings
125  int32_t pong_recv; // Milliseconds unix time the last pong was received
126  int32_t epoch; //
127  bool connected; // The state of the link used for the node-to-node cluster
128  std::vector<std::pair<uint32_t, uint32_t> > mSlots;
129 
130  bool checkSlot(uint32_t slotindex)
131  {
132  std::vector<std::pair<uint32_t, uint32_t> >::const_iterator citer = mSlots.begin();
133  for (; citer != mSlots.end(); ++citer) {
134  //xredis_debug("check %u [%u, %u]\n", slotindex, citer->first, citer->second);
135  if ((slotindex >= citer->first) && (slotindex <= citer->second)) {
136  return true;
137  }
138  }
139  return false;
140  }
141 
142  void parse_role(const std::string& nodeString)
143  {
144  const char* p = strstr(nodeString.c_str(), "master");
145  if (NULL != p) {
146  is_master = true;
147  is_slave = false;
148  }
149 
150  p = strstr(nodeString.c_str(), "slave");
151  if (NULL != p) {
152  is_master = false;
153  is_slave = true;
154  }
155 
156  p = strstr(nodeString.c_str(), "fail");
157  if (NULL != p) {
158  is_fail = true;
159  } else {
160  is_fail = false;
161  }
162  }
163 
164  bool parse_host(const std::string& nodeString)
165  {
166  std::string::size_type ColonPos = nodeString.find(':');
167  if (ColonPos == std::string::npos) {
168  return false;
169  } else {
170  const std::string port_str = nodeString.substr(ColonPos + 1);
171  port = atoi(port_str.c_str());
172  ip = nodeString.substr(0, ColonPos);
173  return true;
174  }
175  }
176 
177  void parse_slot(const std::string& SlotString)
178  {
179  if (0 == SlotString.length()) {
180  xredis_warn("SlotString is NULL ");
181  return;
182  }
183 
184  uint32_t StartSlot = 0;
185  uint32_t EndSlot = 0;
186  std::string::size_type BarPos = SlotString.find('-');
187  if (BarPos == std::string::npos) {
188  StartSlot = atoi(SlotString.c_str());
189  EndSlot = StartSlot;
190  } else {
191  const std::string EndSlotStr = SlotString.substr(BarPos + 1);
192  EndSlot = atoi(EndSlotStr.c_str());
193  StartSlot = atoi(SlotString.substr(0, BarPos).c_str());
194  }
195  mSlots.push_back(std::make_pair(StartSlot, EndSlot));
196 
197  xredis_debug("%s mSlots:%u", SlotString.c_str(), mSlots.size());
198  }
199 };
200 
201 typedef std::vector<NodeInfo> NODELIST;
202 
203 class ClusterInfo {
204 public:
205  ClusterInfo() { }
206  ~ClusterInfo() { }
207  NODELIST nodes;
208  std::string pass;
209  uint32_t poolsize;
210  bool clusterEnabled;
211 };
212 
214 public:
217 
218 private:
219 public:
220  static bool auth(redisContext* c, const std::string& pass);
221  bool connectCluster(const ClusterInfo* cluster_info);
222  void keepalive();
223  bool commandArgv(const VSTRING& vDataIn, RedisResult& result);
224  bool command(RedisResult& result, const char* format, const char* key, va_list args);
225 
226  RedisConnection* get_cluster_info(ClusterInfo& ClusterInfo);
227  bool check_cluster_info(ClusterInfo& cinfo);
228 
229  static bool Sort_asc(const NodeInfo& a, const NodeInfo& b)
230  {
231  return (a.id < b.id);
232  }
233 
234  void bfree()
235  {
236  bFree = true;
237  }
238 
239 private:
240  static uint16_t crc16(const char* buf, int32_t len);
241  static bool checkReply(const redisReply* reply);
242  static void freeReply(const redisReply* reply);
243  static int32_t str2Vect(const char* pSrc, std::vector<std::string>& vDest,
244  const char* pSep = ",");
245 
246 public:
247  static bool clusterEnabled(redisContext* ctx);
248  static bool clusterState(redisContext* ctx);
249  static bool clusterNodes(redisContext* ctx, ClusterInfo* info);
250  void freeConnection(RedisConnection* pRedis);
251  bool release();
252 
253  static xRedisClusterManager* connectRedis(const std::string& host, uint32_t port,
254  const std::string& pass, uint32_t poolsize, ClusterInfo*& info);
255 
256 private:
257  bool connectRedisNode(int idx, const std::string& host, uint32_t port,
258  const std::string& pass, uint32_t poolsize);
259  bool checkReply(redisReply* reply);
260  uint32_t keyHashSlot(const char* key, size_t keylen);
261 
262  RedisConnection* getConnection(uint32_t idx);
263 
264  uint32_t findNodeIndex(uint32_t slot);
265  uint32_t getKeySlotIndex(const char* key);
266  RedisConnection* findNodeConnection(const char* key);
267 
268 private:
269  RedisConnectionList* mRedisConnList;
270  xLock* mLock;
271  ClusterInfo mClusterInfo;
272  bool bFree;
273 };
274 
275 }
276 #endif
Definition: xRedisPool.h:39
Definition: xLock.h:22
Definition: xRedisClusterClient.h:33
Definition: xRedisClusterManager.h:213
Definition: xRedisClusterManager.h:32
Definition: xRedisClusterManager.h:114
Definition: xRedisClusterManager.h:203

Generated on Mon Mar 21 2022 10:57:13 for xRedis API version 1.5.0.