rpcCache.c 8.1 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "rpcCache.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17
#include "os.h"
dengyihao's avatar
dengyihao 已提交
18
#include "rpcLog.h"
S
Shengliang Guan 已提交
19
#include "taosdef.h"
S
slguan 已提交
20
#include "tglobal.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
21 22 23 24
#include "tmempool.h"
#include "ttimer.h"
#include "tutil.h"

J
Jeff Tao 已提交
25
typedef struct SConnHash {
J
jtao1735 已提交
26
  char              fqdn[TSDB_FQDN_LEN];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27
  uint16_t          port;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
28
  char              connType;
J
Jeff Tao 已提交
29 30
  struct SConnHash *prev;
  struct SConnHash *next;
dengyihao's avatar
dengyihao 已提交
31
  void *            data;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
32 33 34 35
  uint64_t          time;
} SConnHash;

typedef struct {
dengyihao's avatar
dengyihao 已提交
36
  SConnHash **    connHashList;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
37 38 39 40 41
  mpool_h         connHashMemPool;
  int             maxSessions;
  int             total;
  int *           count;
  int64_t         keepTimer;
wafwerar's avatar
wafwerar 已提交
42
  TdThreadMutex mutex;
dengyihao's avatar
dengyihao 已提交
43 44 45 46
  void (*cleanFp)(void *);
  void *   tmrCtrl;
  void *   pTimer;
  int64_t *lockedBy;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47 48
} SConnCache;

J
jtao1735 已提交
49
static int  rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50 51 52 53
static void rpcLockCache(int64_t *lockedBy);
static void rpcUnlockCache(int64_t *lockedBy);
static void rpcCleanConnCache(void *handle, void *tmrId);
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
54

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
55 56 57 58 59 60 61 62
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) {
  SConnHash **connHashList;
  mpool_h     connHashMemPool;
  SConnCache *pCache;

  connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash));
  if (connHashMemPool == 0) return NULL;

wafwerar's avatar
wafwerar 已提交
63
  connHashList = taosMemoryCalloc(sizeof(SConnHash *), maxSessions);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
64 65 66
  if (connHashList == 0) {
    taosMemPoolCleanUp(connHashMemPool);
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67 68
  }

wafwerar's avatar
wafwerar 已提交
69
  pCache = taosMemoryMalloc(sizeof(SConnCache));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
70 71
  if (pCache == NULL) {
    taosMemPoolCleanUp(connHashMemPool);
wafwerar's avatar
wafwerar 已提交
72
    taosMemoryFree(connHashList);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
73 74 75
    return NULL;
  }
  memset(pCache, 0, sizeof(SConnCache));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76

wafwerar's avatar
wafwerar 已提交
77
  pCache->count = taosMemoryCalloc(sizeof(int), maxSessions);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
78 79 80 81 82 83 84
  pCache->total = 0;
  pCache->keepTimer = keepTimer;
  pCache->maxSessions = maxSessions;
  pCache->connHashMemPool = connHashMemPool;
  pCache->connHashList = connHashList;
  pCache->cleanFp = cleanFp;
  pCache->tmrCtrl = tmrCtrl;
wafwerar's avatar
wafwerar 已提交
85
  pCache->lockedBy = taosMemoryCalloc(sizeof(int64_t), maxSessions);
86
  taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
87

wafwerar's avatar
wafwerar 已提交
88
  taosThreadMutexInit(&pCache->mutex, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
89 90

  return pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
91 92
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
93 94
void rpcCloseConnCache(void *handle) {
  SConnCache *pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96 97
  pCache = (SConnCache *)handle;
  if (pCache == NULL || pCache->maxSessions == 0) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
98

wafwerar's avatar
wafwerar 已提交
99
  taosThreadMutexLock(&pCache->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101 102 103 104
  taosTmrStopA(&(pCache->pTimer));

  if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool);

wafwerar's avatar
wafwerar 已提交
105 106 107
  taosMemoryFreeClear(pCache->connHashList);
  taosMemoryFreeClear(pCache->count);
  taosMemoryFreeClear(pCache->lockedBy);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108

wafwerar's avatar
wafwerar 已提交
109
  taosThreadMutexUnlock(&pCache->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
110

wafwerar's avatar
wafwerar 已提交
111
  taosThreadMutexDestroy(&pCache->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
112 113

  memset(pCache, 0, sizeof(SConnCache));
wafwerar's avatar
wafwerar 已提交
114
  taosMemoryFree(pCache);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
115 116
}

J
jtao1735 已提交
117
void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118 119
  int         hash;
  SConnHash * pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
120
  SConnCache *pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
121 122 123

  uint64_t time = taosGetTimestampMs();

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
124
  pCache = (SConnCache *)handle;
dengyihao's avatar
dengyihao 已提交
125
  assert(pCache);
126
  assert(data);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
127

J
jtao1735 已提交
128
  hash = rpcHashConn(pCache, fqdn, port, connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
129
  pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130
  tstrncpy(pNode->fqdn, fqdn, sizeof(pNode->fqdn));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
131
  pNode->port = port;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
132
  pNode->connType = connType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
133 134 135 136
  pNode->data = data;
  pNode->prev = NULL;
  pNode->time = time;

dengyihao's avatar
dengyihao 已提交
137
  rpcLockCache(pCache->lockedBy + hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
139 140 141
  pNode->next = pCache->connHashList[hash];
  if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode;
  pCache->connHashList[hash] = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
143 144
  pCache->count[hash]++;
  rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
145

dengyihao's avatar
dengyihao 已提交
146
  rpcUnlockCache(pCache->lockedBy + hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
147

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
148
  pCache->total++;
dengyihao's avatar
dengyihao 已提交
149 150
  // tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode,
  // pCache->count[hash]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152
  return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
153 154
}

J
jtao1735 已提交
155
void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156 157
  int         hash;
  SConnHash * pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
158
  SConnCache *pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
159 160
  void *      pData = NULL;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161
  pCache = (SConnCache *)handle;
dengyihao's avatar
dengyihao 已提交
162
  assert(pCache);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
163 164 165

  uint64_t time = taosGetTimestampMs();

J
jtao1735 已提交
166
  hash = rpcHashConn(pCache, fqdn, port, connType);
dengyihao's avatar
dengyihao 已提交
167
  rpcLockCache(pCache->lockedBy + hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
168

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
169
  pNode = pCache->connHashList[hash];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
170
  while (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
171 172
    if (time >= pCache->keepTimer + pNode->time) {
      rpcRemoveExpiredNodes(pCache, pNode, hash, time);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173 174 175 176
      pNode = NULL;
      break;
    }

J
jtao1735 已提交
177
    if (strcmp(pNode->fqdn, fqdn) == 0 && pNode->port == port && pNode->connType == connType) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178 179 180 181 182

    pNode = pNode->next;
  }

  if (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
183
    rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
184 185 186 187

    if (pNode->prev) {
      pNode->prev->next = pNode->next;
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
188
      pCache->connHashList[hash] = pNode->next;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189 190 191 192 193 194 195
    }

    if (pNode->next) {
      pNode->next->prev = pNode->prev;
    }

    pData = pNode->data;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
196 197 198
    taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
    pCache->total--;
    pCache->count[hash]--;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
199 200
  }

dengyihao's avatar
dengyihao 已提交
201
  rpcUnlockCache(pCache->lockedBy + hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
202 203

  if (pData) {
dengyihao's avatar
dengyihao 已提交
204 205
    // tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode,
    // pCache->count[hash]);
H
hjxilinx 已提交
206
  } else {
dengyihao's avatar
dengyihao 已提交
207 208
    // tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash,
    // pCache->count[hash]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
209 210 211 212 213
  }

  return pData;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
214 215 216
static void rpcCleanConnCache(void *handle, void *tmrId) {
  int         hash;
  SConnHash * pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
217
  SConnCache *pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
219 220 221
  pCache = (SConnCache *)handle;
  if (pCache == NULL || pCache->maxSessions == 0) return;
  if (pCache->pTimer != tmrId) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222

wafwerar's avatar
wafwerar 已提交
223
  taosThreadMutexLock(&pCache->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
224
  uint64_t time = taosGetTimestampMs();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
225

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
226
  for (hash = 0; hash < pCache->maxSessions; ++hash) {
dengyihao's avatar
dengyihao 已提交
227
    rpcLockCache(pCache->lockedBy + hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
228 229
    pNode = pCache->connHashList[hash];
    rpcRemoveExpiredNodes(pCache, pNode, hash, time);
dengyihao's avatar
dengyihao 已提交
230
    rpcUnlockCache(pCache->lockedBy + hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231 232
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
233
  // tTrace("timer, total connections in cache:%d", pCache->total);
234
  taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer);
wafwerar's avatar
wafwerar 已提交
235
  taosThreadMutexUnlock(&pCache->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236 237
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
238
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) {
dengyihao's avatar
dengyihao 已提交
239
  if (pNode == NULL || (time < pCache->keepTimer + pNode->time)) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
240

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
241
  SConnHash *pPrev = pNode->prev, *pNext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
242

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243 244 245 246 247
  while (pNode) {
    (*pCache->cleanFp)(pNode->data);
    pNext = pNode->next;
    pCache->total--;
    pCache->count[hash]--;
dengyihao's avatar
dengyihao 已提交
248 249
    // tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port,
    // pNode->connType, hash, pNode,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250
    //         pCache->count[hash]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
251 252 253
    taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
    pNode = pNext;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
254

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
255 256 257 258 259
  if (pPrev)
    pPrev->next = NULL;
  else
    pCache->connHashList[hash] = NULL;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
260

J
jtao1735 已提交
261
static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
262 263
  SConnCache *pCache = (SConnCache *)handle;
  int         hash = 0;
dengyihao's avatar
dengyihao 已提交
264
  char *      temp = fqdn;
J
jtao1735 已提交
265 266 267 268 269

  while (*temp) {
    hash += *temp;
    ++temp;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
270

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271 272 273 274
  hash += port;
  hash += connType;

  hash = hash % pCache->maxSessions;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
275

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276 277
  return hash;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
278

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
279
static void rpcLockCache(int64_t *lockedBy) {
S
TD-2616  
Shengliang Guan 已提交
280
  int64_t tid = taosGetSelfPthreadId();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
281 282 283 284 285 286 287
  int     i = 0;
  while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
    if (++i % 100 == 0) {
      sched_yield();
    }
  }
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
288

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289
static void rpcUnlockCache(int64_t *lockedBy) {
S
TD-2616  
Shengliang Guan 已提交
290
  int64_t tid = taosGetSelfPthreadId();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
291 292 293
  if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
    assert(false);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
294
}