rpcCache.c 7.9 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
S
slguan 已提交
17
#include "tglobal.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
18 19 20
#include "tmempool.h"
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
21
#include "rpcLog.h"
22
#include "rpcCache.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23

J
Jeff Tao 已提交
24
typedef struct SConnHash {
J
jtao1735 已提交
25
  char              fqdn[TSDB_FQDN_LEN];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26
  uint16_t          port;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27
  char              connType;
J
Jeff Tao 已提交
28 29 30
  struct SConnHash *prev;
  struct SConnHash *next;
  void             *data;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
31 32 33 34
  uint64_t          time;
} SConnHash;

typedef struct {
J
Jeff Tao 已提交
35
  SConnHash     **connHashList;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
36 37 38 39 40 41
  mpool_h         connHashMemPool;
  int             maxSessions;
  int             total;
  int *           count;
  int64_t         keepTimer;
  pthread_mutex_t mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
42 43 44
  void          (*cleanFp)(void *);
  void           *tmrCtrl;
  void           *pTimer;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
45
  int64_t        *lockedBy;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46 47
} SConnCache;

J
jtao1735 已提交
48
static int  rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
49 50 51 52
static void rpcLockCache(int64_t *lockedBy);
static void rpcUnlockCache(int64_t *lockedBy);
static void rpcCleanConnCache(void *handle, void *tmrId);
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
53

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
54 55 56 57 58 59 60 61 62 63 64 65
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) {
  SConnHash **connHashList;
  mpool_h     connHashMemPool;
  SConnCache *pCache;

  connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash));
  if (connHashMemPool == 0) return NULL;

  connHashList = calloc(sizeof(SConnHash *), maxSessions);
  if (connHashList == 0) {
    taosMemPoolCleanUp(connHashMemPool);
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
66 67
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68 69 70 71 72 73 74
  pCache = malloc(sizeof(SConnCache));
  if (pCache == NULL) {
    taosMemPoolCleanUp(connHashMemPool);
    free(connHashList);
    return NULL;
  }
  memset(pCache, 0, sizeof(SConnCache));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76 77 78 79 80 81 82 83 84
  pCache->count = calloc(sizeof(int), maxSessions);
  pCache->total = 0;
  pCache->keepTimer = keepTimer;
  pCache->maxSessions = maxSessions;
  pCache->connHashMemPool = connHashMemPool;
  pCache->connHashList = connHashList;
  pCache->cleanFp = cleanFp;
  pCache->tmrCtrl = tmrCtrl;
  pCache->lockedBy = calloc(sizeof(int64_t), maxSessions);
85
  taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
86 87 88 89

  pthread_mutex_init(&pCache->mutex, NULL);

  return pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
90 91
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
92 93
void rpcCloseConnCache(void *handle) {
  SConnCache *pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
94

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95 96
  pCache = (SConnCache *)handle;
  if (pCache == NULL || pCache->maxSessions == 0) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
98
  pthread_mutex_lock(&pCache->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
99

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100 101 102 103
  taosTmrStopA(&(pCache->pTimer));

  if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool);

S
TD-1848  
Shengliang Guan 已提交
104 105 106
  tfree(pCache->connHashList);
  tfree(pCache->count);
  tfree(pCache->lockedBy);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
107 108 109 110 111 112 113

  pthread_mutex_unlock(&pCache->mutex);

  pthread_mutex_destroy(&pCache->mutex);

  memset(pCache, 0, sizeof(SConnCache));
  free(pCache);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
114 115
}

J
jtao1735 已提交
116
void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
117 118
  int         hash;
  SConnHash * pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
119
  SConnCache *pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
120 121 122

  uint64_t time = taosGetTimestampMs();

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
123 124
  pCache = (SConnCache *)handle;
  assert(pCache); 
125
  assert(data);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
126

J
jtao1735 已提交
127
  hash = rpcHashConn(pCache, fqdn, port, connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
128
  pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
129
  tstrncpy(pNode->fqdn, fqdn, sizeof(pNode->fqdn));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130
  pNode->port = port;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
131
  pNode->connType = connType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
132 133 134 135
  pNode->data = data;
  pNode->prev = NULL;
  pNode->time = time;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
136
  rpcLockCache(pCache->lockedBy+hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
137

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138 139 140
  pNode->next = pCache->connHashList[hash];
  if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode;
  pCache->connHashList[hash] = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
141

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142 143
  pCache->count[hash]++;
  rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
145
  rpcUnlockCache(pCache->lockedBy+hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
146

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
147
  pCache->total++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
148
  // tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
149

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
150
  return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151 152
}

J
jtao1735 已提交
153
void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
154 155
  int         hash;
  SConnHash * pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156
  SConnCache *pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
157 158
  void *      pData = NULL;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
159 160
  pCache = (SConnCache *)handle;
  assert(pCache); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161 162 163

  uint64_t time = taosGetTimestampMs();

J
jtao1735 已提交
164
  hash = rpcHashConn(pCache, fqdn, port, connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
165
  rpcLockCache(pCache->lockedBy+hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
166

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
167
  pNode = pCache->connHashList[hash];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
168
  while (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
169 170
    if (time >= pCache->keepTimer + pNode->time) {
      rpcRemoveExpiredNodes(pCache, pNode, hash, time);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
171 172 173 174
      pNode = NULL;
      break;
    }

J
jtao1735 已提交
175
    if (strcmp(pNode->fqdn, fqdn) == 0 && pNode->port == port && pNode->connType == connType) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
176 177 178 179 180

    pNode = pNode->next;
  }

  if (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
181
    rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
182 183 184 185

    if (pNode->prev) {
      pNode->prev->next = pNode->next;
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
186
      pCache->connHashList[hash] = pNode->next;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187 188 189 190 191 192 193
    }

    if (pNode->next) {
      pNode->next->prev = pNode->prev;
    }

    pData = pNode->data;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
194 195 196
    taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
    pCache->total--;
    pCache->count[hash]--;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
197 198
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
199
  rpcUnlockCache(pCache->lockedBy+hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
200 201

  if (pData) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
202
    //tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]);
H
hjxilinx 已提交
203
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
204
    //tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
205 206 207 208 209
  }

  return pData;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
210 211 212
static void rpcCleanConnCache(void *handle, void *tmrId) {
  int         hash;
  SConnHash * pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
213
  SConnCache *pCache;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
214

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
215 216 217
  pCache = (SConnCache *)handle;
  if (pCache == NULL || pCache->maxSessions == 0) return;
  if (pCache->pTimer != tmrId) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218

陶建辉(Jeff)'s avatar
TD-1628  
陶建辉(Jeff) 已提交
219
  pthread_mutex_lock(&pCache->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
220
  uint64_t time = taosGetTimestampMs();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
221

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222 223 224 225 226
  for (hash = 0; hash < pCache->maxSessions; ++hash) {
    rpcLockCache(pCache->lockedBy+hash);
    pNode = pCache->connHashList[hash];
    rpcRemoveExpiredNodes(pCache, pNode, hash, time);
    rpcUnlockCache(pCache->lockedBy+hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
227 228
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229
  // tTrace("timer, total connections in cache:%d", pCache->total);
230
  taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer);
陶建辉(Jeff)'s avatar
TD-1628  
陶建辉(Jeff) 已提交
231
  pthread_mutex_unlock(&pCache->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
232 233
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
234 235
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) {
  if (pNode == NULL || (time < pCache->keepTimer + pNode->time) ) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237
  SConnHash *pPrev = pNode->prev, *pNext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
238

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239 240 241 242 243
  while (pNode) {
    (*pCache->cleanFp)(pNode->data);
    pNext = pNode->next;
    pCache->total--;
    pCache->count[hash]--;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244 245
    //tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode,
    //         pCache->count[hash]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
246 247 248
    taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
    pNode = pNext;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
249

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250 251 252 253 254
  if (pPrev)
    pPrev->next = NULL;
  else
    pCache->connHashList[hash] = NULL;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
255

J
jtao1735 已提交
256
static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
257 258
  SConnCache *pCache = (SConnCache *)handle;
  int         hash = 0;
J
jtao1735 已提交
259 260 261 262 263 264
  char       *temp = fqdn;

  while (*temp) {
    hash += *temp;
    ++temp;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
265

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
266 267 268 269
  hash += port;
  hash += connType;

  hash = hash % pCache->maxSessions;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
270

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271 272
  return hash;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
273

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
274
static void rpcLockCache(int64_t *lockedBy) {
S
TD-2616  
Shengliang Guan 已提交
275
  int64_t tid = taosGetSelfPthreadId();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276 277 278 279 280 281 282
  int     i = 0;
  while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
    if (++i % 100 == 0) {
      sched_yield();
    }
  }
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
284
static void rpcUnlockCache(int64_t *lockedBy) {
S
TD-2616  
Shengliang Guan 已提交
285
  int64_t tid = taosGetSelfPthreadId();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
286 287 288
  if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
    assert(false);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
290