rpcCache.c 8.0 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
S
Shengliang Guan 已提交
17
#include "taosdef.h"
S
slguan 已提交
18
#include "tglobal.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
19 20 21
#include "tmempool.h"
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
22
#include "rpcLog.h"
23
#include "rpcCache.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
24

J
Jeff Tao 已提交
25
typedef struct SConnHash {
J
jtao1735 已提交
26
  char              fqdn[TSDB_FQDN_LEN];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27
  uint16_t          port;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
28
  char              connType;
J
Jeff Tao 已提交
29 30 31
  struct SConnHash *prev;
  struct SConnHash *next;
  void             *data;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
32 33 34 35
  uint64_t          time;
} SConnHash;

typedef struct {
J
Jeff Tao 已提交
36
  SConnHash     **connHashList;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
37 38 39 40 41 42
  mpool_h         connHashMemPool;
  int             maxSessions;
  int             total;
  int *           count;
  int64_t         keepTimer;
  pthread_mutex_t mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
43 44 45
  void          (*cleanFp)(void *);
  void           *tmrCtrl;
  void           *pTimer;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46
  int64_t        *lockedBy;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47 48
} SConnCache;

J
jtao1735 已提交
49
static int  rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50 51 52 53
static void rpcLockCache(int64_t *lockedBy);
static void rpcUnlockCache(int64_t *lockedBy);
static void rpcCleanConnCache(void *handle, void *tmrId);
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
54

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
55 56 57 58 59 60 61 62 63 64 65 66
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) {
  SConnHash **connHashList;
  mpool_h     connHashMemPool;
  SConnCache *pCache;

  connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash));
  if (connHashMemPool == 0) return NULL;

  connHashList = calloc(sizeof(SConnHash *), maxSessions);
  if (connHashList == 0) {
    taosMemPoolCleanUp(connHashMemPool);
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67 68
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
69 70 71 72 73 74 75
  pCache = malloc(sizeof(SConnCache));
  if (pCache == NULL) {
    taosMemPoolCleanUp(connHashMemPool);
    free(connHashList);
    return NULL;
  }
  memset(pCache, 0, sizeof(SConnCache));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
77 78 79 80 81 82 83 84 85
  pCache->count = calloc(sizeof(int), maxSessions);
  pCache->total = 0;
  pCache->keepTimer = keepTimer;
  pCache->maxSessions = maxSessions;
  pCache->connHashMemPool = connHashMemPool;
  pCache->connHashList = connHashList;
  pCache->cleanFp = cleanFp;
  pCache->tmrCtrl = tmrCtrl;
  pCache->lockedBy = calloc(sizeof(int64_t), maxSessions);
86
  taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
87 88 89 90

  pthread_mutex_init(&pCache->mutex, NULL);

  return pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
91 92
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
93 94
void rpcCloseConnCache(void *handle) {
  SConnCache *pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96 97
  pCache = (SConnCache *)handle;
  if (pCache == NULL || pCache->maxSessions == 0) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
98

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
99
  pthread_mutex_lock(&pCache->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101 102 103 104
  taosTmrStopA(&(pCache->pTimer));

  if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool);

S
TD-1848  
Shengliang Guan 已提交
105 106 107
  tfree(pCache->connHashList);
  tfree(pCache->count);
  tfree(pCache->lockedBy);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108 109 110 111 112 113 114

  pthread_mutex_unlock(&pCache->mutex);

  pthread_mutex_destroy(&pCache->mutex);

  memset(pCache, 0, sizeof(SConnCache));
  free(pCache);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
115 116
}

J
jtao1735 已提交
117
void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118 119
  int         hash;
  SConnHash * pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
120
  SConnCache *pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
121 122 123

  uint64_t time = taosGetTimestampMs();

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
124 125
  pCache = (SConnCache *)handle;
  assert(pCache); 
126
  assert(data);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
127

J
jtao1735 已提交
128
  hash = rpcHashConn(pCache, fqdn, port, connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
129
  pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130
  tstrncpy(pNode->fqdn, fqdn, sizeof(pNode->fqdn));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
131
  pNode->port = port;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
132
  pNode->connType = connType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
133 134 135 136
  pNode->data = data;
  pNode->prev = NULL;
  pNode->time = time;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
137
  rpcLockCache(pCache->lockedBy+hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
139 140 141
  pNode->next = pCache->connHashList[hash];
  if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode;
  pCache->connHashList[hash] = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
143 144
  pCache->count[hash]++;
  rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
145

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
146
  rpcUnlockCache(pCache->lockedBy+hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
147

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
148
  pCache->total++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
149
  // tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
150

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151
  return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152 153
}

J
jtao1735 已提交
154
void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155 156
  int         hash;
  SConnHash * pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
157
  SConnCache *pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
158 159
  void *      pData = NULL;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
160 161
  pCache = (SConnCache *)handle;
  assert(pCache); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
162 163 164

  uint64_t time = taosGetTimestampMs();

J
jtao1735 已提交
165
  hash = rpcHashConn(pCache, fqdn, port, connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
166
  rpcLockCache(pCache->lockedBy+hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
167

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
168
  pNode = pCache->connHashList[hash];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
169
  while (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
170 171
    if (time >= pCache->keepTimer + pNode->time) {
      rpcRemoveExpiredNodes(pCache, pNode, hash, time);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
172 173 174 175
      pNode = NULL;
      break;
    }

J
jtao1735 已提交
176
    if (strcmp(pNode->fqdn, fqdn) == 0 && pNode->port == port && pNode->connType == connType) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177 178 179 180 181

    pNode = pNode->next;
  }

  if (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
182
    rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
183 184 185 186

    if (pNode->prev) {
      pNode->prev->next = pNode->next;
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187
      pCache->connHashList[hash] = pNode->next;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
188 189 190 191 192 193 194
    }

    if (pNode->next) {
      pNode->next->prev = pNode->prev;
    }

    pData = pNode->data;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
195 196 197
    taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
    pCache->total--;
    pCache->count[hash]--;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
198 199
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
200
  rpcUnlockCache(pCache->lockedBy+hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201 202

  if (pData) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
203
    //tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]);
H
hjxilinx 已提交
204
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
205
    //tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
206 207 208 209 210
  }

  return pData;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
211 212 213
static void rpcCleanConnCache(void *handle, void *tmrId) {
  int         hash;
  SConnHash * pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
214
  SConnCache *pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
215

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
216 217 218
  pCache = (SConnCache *)handle;
  if (pCache == NULL || pCache->maxSessions == 0) return;
  if (pCache->pTimer != tmrId) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
219

陶建辉(Jeff)'s avatar
TD-1628  
陶建辉(Jeff) 已提交
220
  pthread_mutex_lock(&pCache->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
221
  uint64_t time = taosGetTimestampMs();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
223 224 225 226 227
  for (hash = 0; hash < pCache->maxSessions; ++hash) {
    rpcLockCache(pCache->lockedBy+hash);
    pNode = pCache->connHashList[hash];
    rpcRemoveExpiredNodes(pCache, pNode, hash, time);
    rpcUnlockCache(pCache->lockedBy+hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
228 229
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
230
  // tTrace("timer, total connections in cache:%d", pCache->total);
231
  taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer);
陶建辉(Jeff)'s avatar
TD-1628  
陶建辉(Jeff) 已提交
232
  pthread_mutex_unlock(&pCache->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
233 234
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
235 236
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) {
  if (pNode == NULL || (time < pCache->keepTimer + pNode->time) ) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
238
  SConnHash *pPrev = pNode->prev, *pNext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
240 241 242 243 244
  while (pNode) {
    (*pCache->cleanFp)(pNode->data);
    pNext = pNode->next;
    pCache->total--;
    pCache->count[hash]--;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
245 246
    //tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode,
    //         pCache->count[hash]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
247 248 249
    taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
    pNode = pNext;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
251 252 253 254 255
  if (pPrev)
    pPrev->next = NULL;
  else
    pCache->connHashList[hash] = NULL;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
256

J
jtao1735 已提交
257
static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
258 259
  SConnCache *pCache = (SConnCache *)handle;
  int         hash = 0;
J
jtao1735 已提交
260 261 262 263 264 265
  char       *temp = fqdn;

  while (*temp) {
    hash += *temp;
    ++temp;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
266

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
267 268 269 270
  hash += port;
  hash += connType;

  hash = hash % pCache->maxSessions;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
272 273
  return hash;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
274

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
275
static void rpcLockCache(int64_t *lockedBy) {
S
TD-2616  
Shengliang Guan 已提交
276
  int64_t tid = taosGetSelfPthreadId();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
277 278 279 280 281 282 283
  int     i = 0;
  while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
    if (++i % 100 == 0) {
      sched_yield();
    }
  }
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
284

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
285
static void rpcUnlockCache(int64_t *lockedBy) {
S
TD-2616  
Shengliang Guan 已提交
286
  int64_t tid = taosGetSelfPthreadId();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
287 288 289
  if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
    assert(false);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
290
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
291