xRedis API  1.1
The C++ Client API for Redis server
xRedisPool.h
1 /*
2  * ----------------------------------------------------------------------------
3  * Copyright (c) 2013-2014, xSky <guozhw at gmail dot com>
4  * All rights reserved.
5  * Distributed under GPL license.
6  * ----------------------------------------------------------------------------
7  */
8 
9 #ifndef _XREDIS_POOL_H_
10 #define _XREDIS_POOL_H_
11 
12 #include <string.h>
13 #include <string>
14 #include <list>
15 #include "xLock.h"
16 #include "hiredis.h"
17 #include "xRedisClient.h"
18 using namespace std;
19 
20 #define MAX_REDIS_CONN_POOLSIZE 128 // 每个DB最大连接数
21 #define MAX_REDIS_CACHE_TYPE 128 // 最大支持的CACHE种类数
22 #define MAX_REDIS_DB_HASHBASE 128 // 最大HASH分库基数
23 
24 #define GET_CONNECT_ERROR "get connection error"
25 #define CONNECT_CLOSED_ERROR "redis connection be closed"
26 
27 
28 enum {
29  REDISDB_UNCONN,
30  REDISDB_WORKING,
31  REDISDB_DEAD
32 };
33 
34 class RedisConn
35 {
36 public:
37  RedisConn()
38  {
39  mCtx = NULL;
40  mPort = 0;
41  mTimeout = 0;
42  mPoolsize = 0;
43  mType = 0;
44  mDbindex = 0;
45  }
46  ~RedisConn()
47  {
48 
49  }
50 
51  void Init(unsigned int cahcetype,
52  unsigned int dbindex,
53  const std::string& host,
54  unsigned int port,
55  const std::string& pass,
56  unsigned int poolsize,
57  unsigned int timeout,
58  unsigned int role,
59  unsigned int slaveidx
60  )
61  {
62  mType = cahcetype;
63  mDbindex = dbindex;
64  mHost = host;
65  mPort = port;
66  mPass = pass;
67  mPoolsize = poolsize;
68  mTimeout = timeout;
69  mRole = role;
70  mSlaveIdx = slaveidx;
71  }
72 
73  bool RedisConnect()
74  {
75  if (NULL != mCtx) {
76  redisFree(mCtx);
77  mCtx = NULL;
78  }
79 
80  bool bRet = false;
81  struct timeval timeoutVal;
82  timeoutVal.tv_sec = mTimeout;
83  timeoutVal.tv_usec = 0;
84 
85  mCtx = redisConnectWithTimeout(mHost.c_str(), mPort, timeoutVal);
86  if (NULL == mCtx || mCtx->err) {
87  if (NULL != mCtx) {
88  printf("Connect to Redis: type:%u dbindex:%u %s:%u pass:%s poolsize:%u error:%s \n",
89  mType, mDbindex, mHost.c_str(), mPort, mPass.c_str(), mPoolsize, mCtx->errstr);
90  redisFree(mCtx);
91  mCtx = NULL;
92  } else {
93  printf("Connection error: can't allocate redis context \n");
94  }
95  } else {
96  printf("Connect to Redis: type:%u dbindex:%u %s:%u pass:%s poolsize:%u success \n",
97  mType, mDbindex, mHost.c_str(), mPort, mPass.c_str(), mPoolsize);
98  if (0 == mPass.length()) {
99  bRet = true;
100  } else {
101  redisReply *reply = static_cast<redisReply *>(redisCommand(mCtx, "AUTH %s", mPass.c_str()));
102  if ((NULL == reply) || (strcasecmp(reply->str, "OK") != 0)) {
103  bRet = false;
104  printf("auth error: type:%u dbindex:%u %s:%u pass:%s poolsize:%u \n",
105  mType, mDbindex, mHost.c_str(), mPort, mPass.c_str(), mPoolsize);
106  }
107  freeReplyObject(reply);
108  }
109  }
110 
111  return bRet;
112  }
113 
114  bool Ping() const
115  {
116  redisReply *reply = static_cast<redisReply *>(redisCommand(mCtx, "PING"));
117  bool bRet = (NULL != reply);
118  freeReplyObject(reply);
119  return bRet;
120  }
121 
122  redisContext *getCtx() const { return mCtx; }
123  unsigned int getdbindex() const { return mDbindex; }
124  unsigned int GetType() const { return mType; }
125  unsigned int GetRole() const { return mRole; }
126  unsigned int GetSlaveIdx() const { return mSlaveIdx; }
127 
128 private:
129  // redis connector context
130  redisContext *mCtx;
131  string mHost; // redis host
132  unsigned int mPort; // redis sever port
133  string mPass; // redis server password
134  unsigned int mTimeout; // connect timeout second
135  unsigned int mPoolsize; // connect pool size for each redis DB
136  unsigned int mType; // redis cache pool type
137  unsigned int mDbindex; // redis DB index
138  unsigned int mRole; // redis role
139  unsigned int mSlaveIdx; // the index in the slave group
140 };
141 
142 typedef std::list<RedisConn*> RedisConnPool;
143 typedef std::list<RedisConn*>::iterator RedisConnIter;
144 
145 typedef std::vector<RedisConnPool*> RedisSlaveGroup;
146 typedef std::vector<RedisConnPool*>::iterator RedisSlaveGroupIter;
147 
148 typedef struct _RedisDBSlice_Conn_ {
149  RedisConnPool RedisMasterConn;
150  RedisSlaveGroup RedisSlaveConn;
151  xLock Masterlock;
152  xLock Slavelock;
154 
155 
157 {
158 public:
159  RedisDBSlice()
160  {
161  mType = 0;
162  mDbindex = 0;
163  mStatus = 0;
164  mHaveSlave = false;
165  }
166  ~RedisDBSlice()
167  {
168 
169  }
170 
171  void Init(const unsigned int cahcetype,
172  const unsigned int dbindex)
173  {
174  mType = cahcetype;
175  mDbindex = dbindex;
176  }
177 
178  bool ConnectRedisNodes(unsigned int cahcetype, unsigned int dbindex,
179  const std::string& host, unsigned int port, const std::string& passwd,
180  unsigned int poolsize, unsigned int timeout, int role)
181  {
182  bool bRet = false;
183  if ((host.empty())
184  || (cahcetype > MAX_REDIS_CACHE_TYPE)
185  || (dbindex > MAX_REDIS_DB_HASHBASE)
186  || (poolsize > MAX_REDIS_CONN_POOLSIZE)) {
187  printf("error argv: cahcetype:%u dbindex:%u host:%s port:%u poolsize:%u timeout:%u \r\n",
188  cahcetype, dbindex, host.c_str(), port, poolsize, timeout);
189  return false;
190  }
191 
192  try {
193  if (MASTER == role) {
194  XLOCK(mSliceConn.Masterlock);
195  for (unsigned int i = 0; i < poolsize; ++i) {
196  RedisConn *pRedisconn = new RedisConn;
197  if (NULL == pRedisconn) {
198  printf("error pRedisconn is null, %s %u %u \r\n", host.c_str(), port, cahcetype);
199  continue;
200  }
201 
202  pRedisconn->Init(cahcetype, dbindex, host.c_str(), port, passwd.c_str(), poolsize, timeout, role, 0);
203  if (pRedisconn->RedisConnect()) {
204  mSliceConn.RedisMasterConn.push_back(pRedisconn);
205  mStatus = REDISDB_WORKING;
206  } else {
207  delete pRedisconn;
208  }
209  }
210  bRet = true;
211  }
212 
213  if (SLAVE == role) {
214  XLOCK(mSliceConn.Slavelock);
215  RedisConnPool *pSlaveNode = new RedisConnPool;
216  int slave_idx = mSliceConn.RedisSlaveConn.size();
217  for (unsigned int i = 0; i < poolsize; ++i) {
218  RedisConn *pRedisconn = new RedisConn;
219  if (NULL == pRedisconn) {
220  printf("error pRedisconn is null, %s %u %u \r\n", host.c_str(), port, cahcetype);
221  continue;
222  }
223 
224  pRedisconn->Init(cahcetype, dbindex, host.c_str(), port, passwd.c_str(), poolsize, timeout, role, slave_idx);
225  if (pRedisconn->RedisConnect()) {
226  pSlaveNode->push_back(pRedisconn);
227  } else {
228  delete pRedisconn;
229  }
230  }
231  mSliceConn.RedisSlaveConn.push_back(pSlaveNode);
232  bRet = true;
233  mHaveSlave = true;
234  }
235 
236  } catch (...) {
237  printf("connect error poolsize=%u \n", poolsize);
238  return false;
239  }
240  printf("\n");
241  return bRet;
242  }
243 
244  RedisConn *GetMasterConn()
245  {
246  RedisConn *pRedisConn = NULL;
247  XLOCK(mSliceConn.Masterlock);
248  if (!mSliceConn.RedisMasterConn.empty()) {
249  pRedisConn = mSliceConn.RedisMasterConn.front();
250  mSliceConn.RedisMasterConn.pop_front();
251  } else {
252  printf("GetConn error pthread_id=%u \n", (unsigned int)pthread_self());
253  mStatus = REDISDB_DEAD;
254  }
255  return pRedisConn;
256  }
257 
258  RedisConn *GetSlaveConn()
259  {
260  RedisConn *pRedisConn = NULL;
261  XLOCK(mSliceConn.Slavelock);
262  if (!mSliceConn.RedisSlaveConn.empty()) {
263  size_t slave_cnt = mSliceConn.RedisSlaveConn.size();
264  unsigned int idx = rand() % slave_cnt;
265  RedisConnPool *pSlave = mSliceConn.RedisSlaveConn[idx];
266  pRedisConn = pSlave->front();
267  pSlave->pop_front();
268  if (idx != pRedisConn->GetSlaveIdx()) {
269  printf("mSlaveIdx pthread_id=%u \n", (unsigned int)pthread_self());
270  }
271  } else {
272  printf("have no slave mType:%u mDbindex:%u mStatus:%u \n", mType, mDbindex, mStatus);
273  }
274  return pRedisConn;
275  }
276 
277  RedisConn *GetConn(int ioRole)
278  {
279  RedisConn *pRedisConn = NULL;
280  if (!mHaveSlave) {
281  ioRole = MASTER;
282  }
283  if (MASTER == ioRole) {
284  pRedisConn = GetMasterConn();
285  } else if (SLAVE == ioRole) {
286  pRedisConn = GetSlaveConn();
287  } else {
288  pRedisConn = NULL;
289  }
290 
291  return pRedisConn;
292  }
293 
294  void FreeConn(RedisConn *redisconn)
295  {
296  if (NULL != redisconn) {
297  unsigned int role = redisconn->GetRole();
298  if (MASTER == role) {
299  XLOCK(mSliceConn.Masterlock);
300  mSliceConn.RedisMasterConn.push_back(redisconn);
301  } else if (SLAVE == role) {
302  XLOCK(mSliceConn.Slavelock);
303  RedisConnPool *pSlave = mSliceConn.RedisSlaveConn[redisconn->GetSlaveIdx()];
304  pSlave->push_back(redisconn);
305  } else {
306 
307  }
308  }
309  }
310 
311  void CloseConnPool()
312  {
313  {
314  XLOCK(mSliceConn.Masterlock);
315  RedisConnIter master_iter = mSliceConn.RedisMasterConn.begin();
316  for (; master_iter != mSliceConn.RedisMasterConn.end(); ++master_iter) {
317  printf("close dbindex:%u master \r\n", mDbindex);
318  redisFree((*master_iter)->getCtx());
319  delete *master_iter;
320  }
321  }
322 
323  {
324  XLOCK(mSliceConn.Slavelock);
325  RedisSlaveGroupIter slave_iter = mSliceConn.RedisSlaveConn.begin();
326  for (; slave_iter != mSliceConn.RedisSlaveConn.end(); ++slave_iter) {
327  RedisConnPool* pConnPool = (*slave_iter);
328  RedisConnIter iter = pConnPool->begin();
329  for (; iter != pConnPool->end(); ++iter) {
330  printf("close dbindex:%u slave mSlaveIdx:%u \r\n", mDbindex, (*iter)->GetSlaveIdx());
331  redisFree((*iter)->getCtx());
332  delete *iter;
333  }
334  delete pConnPool;
335  }
336  }
337 
338  mStatus = REDISDB_DEAD;
339  }
340 
341  void ConnPoolPing()
342  {
343  {
344  XLOCK(mSliceConn.Masterlock);
345  RedisConnIter iter = mSliceConn.RedisMasterConn.begin();
346  for (; iter != mSliceConn.RedisMasterConn.end(); ++iter) {
347  bool bRet = (*iter)->Ping();
348  if (!bRet) {
349  (*iter)->RedisConnect();
350  } else {
351  printf("ping dbindex:%u sucess \n", mDbindex);
352  }
353  }
354  }
355 
356  {
357  XLOCK(mSliceConn.Slavelock);
358  RedisSlaveGroupIter slave_iter = mSliceConn.RedisSlaveConn.begin();
359  for (; slave_iter != mSliceConn.RedisSlaveConn.end(); ++slave_iter) {
360  RedisConnPool* pConnPool = (*slave_iter);
361  RedisConnIter iter = pConnPool->begin();
362  for (; iter != pConnPool->end(); ++iter) {
363  bool bRet = (*iter)->Ping();
364  if (!bRet) {
365  (*iter)->RedisConnect();
366  } else {
367  printf("ping dbindex:%u sucess \n", mDbindex);
368  }
369  }
370  delete pConnPool;
371  }
372  }
373  }
374 
375  unsigned int GetStatus() const
376  {
377  return mStatus;
378  }
379 
380 private:
381  RedisSliceConn mSliceConn;
382  bool mHaveSlave;
383  unsigned int mType; // redis cache pool type
384  unsigned int mDbindex; // redis DB index
385  unsigned int mStatus; // redis DB status
386 };
387 
389 {
390 public:
391  RedisCache()
392  {
393  mCachetype = 0;
394  mHashbase = 0;
395  mDBList = NULL;
396  }
397  virtual ~RedisCache()
398  {
399 
400  }
401 
402  bool InitDB(const unsigned int cachetype, const unsigned int hashbase)
403  {
404 
405  printf("cachetype:%u hashbase:%u \r\n", cachetype, hashbase);
406 
407  mCachetype = cachetype;
408  mHashbase = hashbase;
409  if (NULL == mDBList) {
410  mDBList = new RedisDBSlice[hashbase];
411  }
412 
413  return true;
414  }
415 
416  bool ConnectRedisDB(const unsigned int cahcetype, const unsigned int dbindex,
417  const char *host, const unsigned int port, const char *passwd,
418  const unsigned int poolsize, unsigned int timeout, unsigned int role)
419  {
420  mDBList[dbindex].Init(cahcetype, dbindex);
421  return mDBList[dbindex].ConnectRedisNodes(cahcetype, dbindex, host, port,
422  passwd, poolsize, timeout, role);
423  }
424 
425  RedisConn *GetConn(const unsigned int dbindex, unsigned int ioRole)
426  {
427  return mDBList[dbindex].GetConn(ioRole);
428  }
429 
430  void FreeConn(RedisConn *redisconn)
431  {
432  return mDBList[redisconn->getdbindex()].FreeConn(redisconn);
433  }
434 
435  void ClosePool()
436  {
437  for (unsigned int i = 0; i < mHashbase; i++) {
438  mDBList[i].CloseConnPool();
439  }
440  delete [] mDBList;
441  mDBList = NULL;
442  }
443 
444  void KeepAlive()
445  {
446  for (unsigned int i = 0; i < mHashbase; i++) {
447  mDBList[i].ConnPoolPing();
448  }
449  }
450 
451  unsigned int GetDBStatus(unsigned int dbindex)
452  {
453  RedisDBSlice *pdbSclice = &mDBList[dbindex];
454  if (NULL == pdbSclice) {
455  return REDISDB_UNCONN;
456  }
457  return pdbSclice->GetStatus();
458  }
459 
460  unsigned int GetHashBase() const
461  {
462  return mHashbase;
463  }
464 
465 private:
466  RedisDBSlice *mDBList;
467  unsigned int mCachetype;
468  unsigned int mHashbase;
469 };
470 
471 
473 {
474 public:
475  RedisPool();
476  ~RedisPool();
477 
478  bool Init(const unsigned int typesize);
479  bool setHashBase(const unsigned int cachetype, const unsigned int hashbase);
480  unsigned int getHashBase(const unsigned int cachetype);
481  bool ConnectRedisDB(unsigned int cachetype, unsigned int dbindex,
482  const char* host, unsigned int port, const char* passwd,
483  unsigned int poolsize, unsigned int timeout, unsigned int role);
484  void Release();
485 
486  static bool CheckReply(const redisReply* reply);
487  static void FreeReply(const redisReply* reply);
488  void KeepAlive();
489 
490  RedisConn *GetConnection(const unsigned int cachetype, const unsigned int index, unsigned int ioType = MASTER);
491  void FreeConnection(RedisConn* redisconn);
492 
493 private:
494  RedisCache *mRedisCacheList;
495  unsigned int mTypeSize;
496 };
497 
498 
499 #endif
Definition: hiredis.h:166
Definition: hiredis.h:98
Definition: xRedisPool.h:148
Definition: xRedisPool.h:34
Definition: xRedisPool.h:388
Definition: xRedisPool.h:472
Definition: xLock.h:22
Definition: xRedisPool.h:156

Generated on Fri Oct 23 2015 13:13:57 for xRedis API version 1.1.