transComm.c 17.1 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifdef USE_UV

#include "transComm.h"

dengyihao's avatar
opt rpc  
dengyihao 已提交
19 20
#define BUFFER_CAP 4096

dengyihao's avatar
dengyihao 已提交
21 22 23
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;

static int32_t refMgt;
dengyihao's avatar
dengyihao 已提交
24
static int32_t instMgt;
dengyihao's avatar
dengyihao 已提交
25

dengyihao's avatar
dengyihao 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
  T_MD5_CTX context;
  int       ret = -1;

  tMD5Init(&context);
  tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
  tMD5Update(&context, (uint8_t*)pMsg, msgLen);
  tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
  tMD5Final(&context);

  if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;

  return ret;
}
dengyihao's avatar
dengyihao 已提交
40

dengyihao's avatar
dengyihao 已提交
41 42 43 44 45 46 47 48 49 50 51
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
  T_MD5_CTX context;

  tMD5Init(&context);
  tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
  tMD5Update(&context, (uint8_t*)pMsg, msgLen);
  tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
  tMD5Final(&context);

  memcpy(pAuth, context.digest, sizeof(context.digest));
}
dengyihao's avatar
dengyihao 已提交
52

dengyihao's avatar
dengyihao 已提交
53
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
dengyihao's avatar
dengyihao 已提交
54
  return false;
dengyihao's avatar
dengyihao 已提交
55 56 57 58 59 60 61
  // SRpcHead* pHead = rpcHeadFromCont(pCont);
  bool succ = false;
  int  overhead = sizeof(STransCompMsg);
  if (!NEEDTO_COMPRESSS_MSG(len)) {
    return succ;
  }

wafwerar's avatar
wafwerar 已提交
62
  char* buf = taosMemoryMalloc(len + overhead + 8);  // 8 extra bytes
dengyihao's avatar
dengyihao 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
  if (buf == NULL) {
    tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
    *flen = len;
    return succ;
  }

  int32_t clen = LZ4_compress_default(msg, buf, len, len + overhead);
  tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, overhead);
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
  if (clen > 0 && clen < len - overhead) {
    STransCompMsg* pComp = (STransCompMsg*)msg;
    pComp->reserved = 0;
    pComp->contLen = htonl(len);
    memcpy(msg + overhead, buf, clen);

    tDebug("compress rpc msg, before:%d, after:%d", len, clen);
    *flen = clen + overhead;
    succ = true;
  } else {
    *flen = len;
    succ = false;
  }
wafwerar's avatar
wafwerar 已提交
88
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
89 90 91 92 93 94 95 96 97 98 99 100
  return succ;
}
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) {
  // impl later
  return false;
  STransCompMsg* pComp = (STransCompMsg*)msg;

  int overhead = sizeof(STransCompMsg);
  int clen = 0;
  return false;
}

dengyihao's avatar
dengyihao 已提交
101 102 103 104
void transFreeMsg(void* msg) {
  if (msg == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
105
  taosMemoryFree((char*)msg - sizeof(STransMsgHead));
dengyihao's avatar
dengyihao 已提交
106
}
dengyihao's avatar
dengyihao 已提交
107 108
int transGetSockDebugInfo(struct sockaddr* sockname, char* dst) {
  struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
dengyihao's avatar
dengyihao 已提交
109

dengyihao's avatar
dengyihao 已提交
110 111 112 113 114
  char buf[20] = {0};
  int  r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
  sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
  return r;
}
dengyihao's avatar
dengyihao 已提交
115
int transInitBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
116 117 118 119 120 121 122 123 124
  buf->cap = BUFFER_CAP;
  buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
  buf->left = -1;
  buf->len = 0;
  buf->total = 0;
  return 0;
}
int transDestroyBuffer(SConnBuffer* buf) {
  taosMemoryFree(buf->buf);
dengyihao's avatar
dengyihao 已提交
125 126
  return 0;
}
dengyihao's avatar
opt rpc  
dengyihao 已提交
127

dengyihao's avatar
dengyihao 已提交
128
int transClearBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
  SConnBuffer* p = buf;
  if (p->cap > BUFFER_CAP) {
    p->cap = BUFFER_CAP;
    p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
  }
  p->left = -1;
  p->len = 0;
  p->total = 0;
  return 0;
}

int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
  SConnBuffer* p = connBuf;
  if (p->left != 0) {
    return -1;
  }
  int total = connBuf->total;
  *buf = taosMemoryCalloc(1, total);
  memcpy(*buf, p->buf, total);

  transResetBuffer(connBuf);
  return total;
}

int transResetBuffer(SConnBuffer* connBuf) {
  SConnBuffer* p = connBuf;
  if (p->total <= p->len) {
    int left = p->len - p->total;
    memmove(p->buf, p->buf + p->total, left);
    p->left = -1;
    p->total = 0;
    p->len = left;
  } else {
    p->left = -1;
    p->total = 0;
    p->len = 0;
  }
dengyihao's avatar
dengyihao 已提交
166 167 168 169 170 171
  return 0;
}
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
  /*
   * formate of data buffer:
   * |<--------------------------data from socket------------------------------->|
dengyihao's avatar
dengyihao 已提交
172 173
   * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
   * info--->|
dengyihao's avatar
dengyihao 已提交
174 175
   */
  SConnBuffer* p = connBuf;
dengyihao's avatar
dengyihao 已提交
176

dengyihao's avatar
opt rpc  
dengyihao 已提交
177 178
  uvBuf->base = p->buf + p->len;
  if (p->left == -1) {
dengyihao's avatar
dengyihao 已提交
179
    uvBuf->len = p->cap - p->len;
dengyihao's avatar
opt rpc  
dengyihao 已提交
180 181 182 183 184 185 186 187
  } else {
    if (p->left < p->cap - p->len) {
      uvBuf->len = p->left;
    } else {
      p->buf = taosMemoryRealloc(p->buf, p->left + p->len);
      uvBuf->base = p->buf + p->len;
      uvBuf->len = p->left;
    }
dengyihao's avatar
dengyihao 已提交
188 189 190
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
191 192
// check whether already read complete
bool transReadComplete(SConnBuffer* connBuf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
193 194 195 196 197 198 199 200 201 202 203 204 205
  SConnBuffer* p = connBuf;
  if (p->len >= sizeof(STransMsgHead)) {
    if (p->left == -1) {
      STransMsgHead head;
      memcpy((char*)&head, connBuf->buf, sizeof(head));
      int32_t msgLen = (int32_t)htonl(head.msgLen);
      p->total = msgLen;
    }
    if (p->total >= p->len) {
      p->left = p->total - p->len;
    } else {
      p->left = 0;
    }
dengyihao's avatar
dengyihao 已提交
206
  }
dengyihao's avatar
opt rpc  
dengyihao 已提交
207
  return p->left == 0 ? true : false;
dengyihao's avatar
dengyihao 已提交
208
}
dengyihao's avatar
dengyihao 已提交
209

dengyihao's avatar
dengyihao 已提交
210 211 212 213 214 215
int transSetConnOption(uv_tcp_t* stream) {
  uv_tcp_nodelay(stream, 1);
  int ret = uv_tcp_keepalive(stream, 5, 5);
  return ret;
}

dengyihao's avatar
dengyihao 已提交
216
SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
wafwerar's avatar
wafwerar 已提交
217
  SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
dengyihao's avatar
dengyihao 已提交
218
  pool->nAsync = sz;
wafwerar's avatar
wafwerar 已提交
219
  pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
dengyihao's avatar
dengyihao 已提交
220 221 222 223

  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    uv_async_init(loop, async, cb);
dengyihao's avatar
dengyihao 已提交
224

wafwerar's avatar
wafwerar 已提交
225
    SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
dengyihao's avatar
dengyihao 已提交
226 227
    item->pThrd = arg;
    QUEUE_INIT(&item->qmsg);
wafwerar's avatar
wafwerar 已提交
228
    taosThreadMutexInit(&item->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
229 230

    async->data = item;
dengyihao's avatar
dengyihao 已提交
231 232 233
  }
  return pool;
}
dengyihao's avatar
dengyihao 已提交
234

dengyihao's avatar
dengyihao 已提交
235
void transAsyncPoolDestroy(SAsyncPool* pool) {
dengyihao's avatar
dengyihao 已提交
236 237
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
dengyihao's avatar
dengyihao 已提交
238
    // uv_close((uv_handle_t*)async, NULL);
dengyihao's avatar
dengyihao 已提交
239
    SAsyncItem* item = async->data;
wafwerar's avatar
wafwerar 已提交
240
    taosThreadMutexDestroy(&item->mtx);
wafwerar's avatar
wafwerar 已提交
241
    taosMemoryFree(item);
dengyihao's avatar
dengyihao 已提交
242
  }
wafwerar's avatar
wafwerar 已提交
243 244
  taosMemoryFree(pool->asyncs);
  taosMemoryFree(pool);
dengyihao's avatar
dengyihao 已提交
245
}
dengyihao's avatar
dengyihao 已提交
246 247 248 249 250 251 252 253
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;
    if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
254
int transAsyncSend(SAsyncPool* pool, queue* q) {
dengyihao's avatar
dengyihao 已提交
255 256 257
  if (atomic_load_8(&pool->stop) == 1) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
258 259 260 261 262 263
  int idx = pool->index;
  idx = idx % pool->nAsync;
  // no need mutex here
  if (pool->index++ > pool->nAsync) {
    pool->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
264 265 266 267
  uv_async_t* async = &(pool->asyncs[idx]);
  SAsyncItem* item = async->data;

  int64_t st = taosGetTimestampUs();
wafwerar's avatar
wafwerar 已提交
268
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
269
  QUEUE_PUSH(&item->qmsg, q);
wafwerar's avatar
wafwerar 已提交
270
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
271 272
  int64_t el = taosGetTimestampUs() - st;
  if (el > 50) {
S
Shengliang Guan 已提交
273
    // tInfo("lock and unlock cost:%d", (int)el);
dengyihao's avatar
dengyihao 已提交
274 275
  }
  return uv_async_send(async);
dengyihao's avatar
dengyihao 已提交
276
}
dengyihao's avatar
dengyihao 已提交
277

dengyihao's avatar
dengyihao 已提交
278 279 280 281
void transCtxInit(STransCtx* ctx) {
  // init transCtx
  ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
}
dengyihao's avatar
dengyihao 已提交
282
void transCtxCleanup(STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
283 284 285 286 287 288
  if (ctx->args == NULL) {
    return;
  }

  STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
  while (iter) {
dengyihao's avatar
dengyihao 已提交
289
    ctx->freeFunc(iter->val);
dengyihao's avatar
dengyihao 已提交
290 291
    iter = taosHashIterate(ctx->args, iter);
  }
dengyihao's avatar
dengyihao 已提交
292
  ctx->freeFunc(ctx->brokenVal.val);
dengyihao's avatar
dengyihao 已提交
293
  taosHashCleanup(ctx->args);
U
ubuntu 已提交
294
  ctx->args = NULL;
dengyihao's avatar
dengyihao 已提交
295 296 297 298 299
}

void transCtxMerge(STransCtx* dst, STransCtx* src) {
  if (dst->args == NULL) {
    dst->args = src->args;
dengyihao's avatar
dengyihao 已提交
300
    dst->brokenVal = src->brokenVal;
dengyihao's avatar
dengyihao 已提交
301
    dst->freeFunc = src->freeFunc;
dengyihao's avatar
dengyihao 已提交
302 303 304 305 306 307 308 309 310 311 312 313
    src->args = NULL;
    return;
  }
  void*  key = NULL;
  size_t klen = 0;
  void*  iter = taosHashIterate(src->args, NULL);
  while (iter) {
    STransCtxVal* sVal = (STransCtxVal*)iter;
    key = taosHashGetKey(sVal, &klen);

    STransCtxVal* dVal = taosHashGet(dst->args, key, klen);
    if (dVal) {
dengyihao's avatar
dengyihao 已提交
314
      dst->freeFunc(dVal->val);
dengyihao's avatar
dengyihao 已提交
315 316 317 318 319 320 321 322 323 324 325 326 327 328
    }
    taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
    iter = taosHashIterate(src->args, iter);
  }
  taosHashCleanup(src->args);
}
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
  if (ctx->args == NULL) {
    return NULL;
  }
  STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
  if (cVal == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
329
  void* ret = NULL;
D
dapan1121 已提交
330 331
  (*cVal->clone)(cVal->val, &ret);
  return ret;
dengyihao's avatar
dengyihao 已提交
332
}
dengyihao's avatar
dengyihao 已提交
333
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
dengyihao's avatar
dengyihao 已提交
334
  void* ret = NULL;
dengyihao's avatar
dengyihao 已提交
335
  if (ctx->brokenVal.clone == NULL) {
dengyihao's avatar
dengyihao 已提交
336
    return ret;
dengyihao's avatar
dengyihao 已提交
337
  }
D
dapan1121 已提交
338
  (*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret);
dengyihao's avatar
dengyihao 已提交
339 340 341

  *msgType = ctx->brokenVal.msgType;

D
dapan1121 已提交
342
  return ret;
dengyihao's avatar
dengyihao 已提交
343
}
dengyihao's avatar
dengyihao 已提交
344

dengyihao's avatar
dengyihao 已提交
345
void transReqQueueInit(queue* q) {
dengyihao's avatar
dengyihao 已提交
346
  // init req queue
dengyihao's avatar
dengyihao 已提交
347 348
  QUEUE_INIT(q);
}
dengyihao's avatar
dengyihao 已提交
349
void* transReqQueuePush(queue* q) {
dengyihao's avatar
dengyihao 已提交
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
  uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
  STransReq*  wreq = taosMemoryCalloc(1, sizeof(STransReq));
  wreq->data = req;
  req->data = wreq;
  QUEUE_PUSH(q, &wreq->q);
  return req;
}
void* transReqQueueRemove(void* arg) {
  void*       ret = NULL;
  uv_write_t* req = arg;
  STransReq*  wreq = req && req->data ? req->data : NULL;

  assert(wreq->data == req);
  if (wreq == NULL || wreq->data == NULL) {
    taosMemoryFree(wreq->data);
    taosMemoryFree(wreq);
    return req;
  }

  QUEUE_REMOVE(&wreq->q);

  ret = req && req->handle ? req->handle->data : NULL;
  taosMemoryFree(wreq->data);
  taosMemoryFree(wreq);

  return ret;
}
void transReqQueueClear(queue* q) {
  while (!QUEUE_IS_EMPTY(q)) {
    queue* h = QUEUE_HEAD(q);
    QUEUE_REMOVE(h);
    STransReq* req = QUEUE_DATA(h, STransReq, q);
    taosMemoryFree(req->data);
    taosMemoryFree(req);
  }
}

wafwerar's avatar
wafwerar 已提交
387
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
dengyihao's avatar
dengyihao 已提交
388
  queue->q = taosArrayInit(2, sizeof(void*));
wafwerar's avatar
wafwerar 已提交
389
  queue->freeFunc = (void (*)(const void*))freeFunc;
dengyihao's avatar
dengyihao 已提交
390 391
}
bool transQueuePush(STransQueue* queue, void* arg) {
dengyihao's avatar
dengyihao 已提交
392 393 394
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
395 396 397 398 399 400 401
  taosArrayPush(queue->q, &arg);
  if (taosArrayGetSize(queue->q) > 1) {
    return false;
  }
  return true;
}
void* transQueuePop(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
402
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
403 404 405 406 407 408
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, 0);
  taosArrayRemove(queue->q, 0);
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
409
int32_t transQueueSize(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
410 411 412
  if (queue->q == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
413 414 415
  return taosArrayGetSize(queue->q);
}
void* transQueueGet(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
416
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
417 418 419 420 421
    return NULL;
  }
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
422

dengyihao's avatar
dengyihao 已提交
423 424 425 426 427
  void* ptr = taosArrayGetP(queue->q, i);
  return ptr;
}

void* transQueueRm(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
428
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
429 430
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
431 432 433 434 435
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, i);
  taosArrayRemove(queue->q, i);
dengyihao's avatar
dengyihao 已提交
436 437
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
438

dengyihao's avatar
dengyihao 已提交
439
bool transQueueEmpty(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
440 441 442
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
443 444 445
  return taosArrayGetSize(queue->q) == 0;
}
void transQueueClear(STransQueue* queue) {
wafwerar's avatar
wafwerar 已提交
446
  if (queue->freeFunc != NULL) {
dengyihao's avatar
dengyihao 已提交
447 448
    for (int i = 0; i < taosArrayGetSize(queue->q); i++) {
      void* p = taosArrayGetP(queue->q, i);
wafwerar's avatar
wafwerar 已提交
449
      queue->freeFunc(p);
dengyihao's avatar
dengyihao 已提交
450 451 452 453 454 455 456 457
    }
  }
  taosArrayClear(queue->q);
}
void transQueueDestroy(STransQueue* queue) {
  transQueueClear(queue);
  taosArrayDestroy(queue->q);
}
dengyihao's avatar
dengyihao 已提交
458 459 460 461 462

static int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
  SDelayTask* arg1 = container_of(a, SDelayTask, node);
  SDelayTask* arg2 = container_of(b, SDelayTask, node);
  if (arg1->execTime > arg2->execTime) {
dengyihao's avatar
dengyihao 已提交
463
    return 0;
dengyihao's avatar
dengyihao 已提交
464 465 466 467 468
  } else {
    return 1;
  }
}

dengyihao's avatar
dengyihao 已提交
469
static void transDQTimeout(uv_timer_t* timer) {
dengyihao's avatar
dengyihao 已提交
470
  SDelayQueue* queue = timer->data;
dengyihao's avatar
dengyihao 已提交
471 472
  tTrace("timer %p timeout", timer);
  uint64_t timeout = 0;
dengyihao's avatar
dengyihao 已提交
473
  int64_t  current = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
474 475 476 477
  do {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) break;
    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
478
    if (task->execTime <= current) {
dengyihao's avatar
dengyihao 已提交
479 480 481 482 483
      heapRemove(queue->heap, minNode);
      task->func(task->arg);
      taosMemoryFree(task);
      timeout = 0;
    } else {
dengyihao's avatar
dengyihao 已提交
484
      timeout = task->execTime - current;
dengyihao's avatar
dengyihao 已提交
485 486 487 488
      break;
    }
  } while (1);
  if (timeout != 0) {
dengyihao's avatar
dengyihao 已提交
489
    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
dengyihao's avatar
dengyihao 已提交
490 491
  }
}
dengyihao's avatar
dengyihao 已提交
492
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
dengyihao's avatar
dengyihao 已提交
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
  uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
  uv_timer_init(loop, timer);

  Heap* heap = heapCreate(timeCompare);

  SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue));
  q->heap = heap;
  q->timer = timer;
  q->loop = loop;
  q->timer->data = q;

  *queue = q;
  return 0;
}

dengyihao's avatar
dengyihao 已提交
508
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
dengyihao's avatar
dengyihao 已提交
509
  taosMemoryFree(queue->timer);
dengyihao's avatar
dengyihao 已提交
510 511 512 513 514 515 516 517 518

  while (heapSize(queue->heap) > 0) {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) {
      return;
    }
    heapRemove(queue->heap, minNode);

    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
519 520

    STaskArg* arg = task->arg;
dengyihao's avatar
dengyihao 已提交
521
    if (freeFunc) freeFunc(arg->param1);
dengyihao's avatar
dengyihao 已提交
522 523
    taosMemoryFree(arg);

dengyihao's avatar
dengyihao 已提交
524 525
    taosMemoryFree(task);
  }
dengyihao's avatar
dengyihao 已提交
526 527 528
  heapDestroy(queue->heap);
  taosMemoryFree(queue);
}
dengyihao's avatar
dengyihao 已提交
529 530
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
  uv_timer_stop(queue->timer);
dengyihao's avatar
dengyihao 已提交
531

dengyihao's avatar
dengyihao 已提交
532 533 534 535 536
  if (heapSize(queue->heap) <= 0) {
    taosMemoryFree(task->arg);
    taosMemoryFree(task);
    return;
  }
dengyihao's avatar
dengyihao 已提交
537 538
  heapRemove(queue->heap, &task->node);

dengyihao's avatar
dengyihao 已提交
539 540 541
  taosMemoryFree(task->arg);
  taosMemoryFree(task);

dengyihao's avatar
dengyihao 已提交
542 543 544 545 546 547 548 549 550 551 552 553 554
  if (heapSize(queue->heap) != 0) {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode != NULL) return;

    uint64_t    now = taosGetTimestampMs();
    SDelayTask* task = container_of(minNode, SDelayTask, node);
    uint64_t    timeout = now > task->execTime ? now - task->execTime : 0;

    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
  }
}

SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
dengyihao's avatar
dengyihao 已提交
555
  uint64_t    now = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
556 557 558
  SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
  task->func = func;
  task->arg = arg;
dengyihao's avatar
dengyihao 已提交
559 560 561 562 563 564
  task->execTime = now + timeoutMs;

  HeapNode* minNode = heapMin(queue->heap);
  if (minNode) {
    SDelayTask* minTask = container_of(minNode, SDelayTask, node);
    if (minTask->execTime < task->execTime) {
dengyihao's avatar
dengyihao 已提交
565
      timeoutMs = minTask->execTime <= now ? 0 : minTask->execTime - now;
dengyihao's avatar
dengyihao 已提交
566 567
    }
  }
dengyihao's avatar
dengyihao 已提交
568

S
Shengliang Guan 已提交
569
  tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs);
dengyihao's avatar
dengyihao 已提交
570
  heapInsert(queue->heap, &task->node);
dengyihao's avatar
dengyihao 已提交
571
  uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
dengyihao's avatar
dengyihao 已提交
572
  return task;
dengyihao's avatar
dengyihao 已提交
573
}
dengyihao's avatar
dengyihao 已提交
574 575 576 577 578 579

void transPrintEpSet(SEpSet* pEpSet) {
  if (pEpSet == NULL) {
    tTrace("NULL epset");
    return;
  }
dengyihao's avatar
dengyihao 已提交
580
  char buf[512] = {0};
dengyihao's avatar
dengyihao 已提交
581
  int  len = snprintf(buf, sizeof(buf), "epset:{");
dengyihao's avatar
dengyihao 已提交
582
  for (int i = 0; i < pEpSet->numOfEps; i++) {
dengyihao's avatar
dengyihao 已提交
583
    if (i == pEpSet->numOfEps - 1) {
dengyihao's avatar
dengyihao 已提交
584
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
dengyihao's avatar
dengyihao 已提交
585 586 587
    } else {
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
    }
dengyihao's avatar
dengyihao 已提交
588
  }
dengyihao's avatar
dengyihao 已提交
589
  len += snprintf(buf + len, sizeof(buf) - len, "}");
dengyihao's avatar
dengyihao 已提交
590
  tTrace("%s, inUse:%d", buf, pEpSet->inUse);
dengyihao's avatar
dengyihao 已提交
591
}
592 593 594 595 596 597 598 599 600 601 602
bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
  if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
    return false;
  }
  for (int i = 0; i < a->numOfEps; i++) {
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
      return false;
    }
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
603

dengyihao's avatar
dengyihao 已提交
604
static void transInitEnv() {
dengyihao's avatar
dengyihao 已提交
605 606
  refMgt = transOpenRefMgt(50000, transDestoryExHandle);
  instMgt = taosOpenRef(50, rpcCloseImpl);
dengyihao's avatar
dengyihao 已提交
607 608
  uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
}
dengyihao's avatar
dengyihao 已提交
609
static void transDestroyEnv() {
dengyihao's avatar
dengyihao 已提交
610 611
  transCloseRefMgt(refMgt);
  transCloseRefMgt(instMgt);
dengyihao's avatar
dengyihao 已提交
612
}
dengyihao's avatar
dengyihao 已提交
613

dengyihao's avatar
dengyihao 已提交
614 615 616 617
void transInit() {
  // init env
  taosThreadOnce(&transModuleInit, transInitEnv);
}
dengyihao's avatar
dengyihao 已提交
618 619 620 621

int32_t transGetRefMgt() { return refMgt; }
int32_t transGetInstMgt() { return instMgt; }

dengyihao's avatar
dengyihao 已提交
622 623 624 625
void transCleanup() {
  // clean env
  transDestroyEnv();
}
dengyihao's avatar
dengyihao 已提交
626
int32_t transOpenRefMgt(int size, void (*func)(void*)) {
dengyihao's avatar
dengyihao 已提交
627
  // added into once later
dengyihao's avatar
dengyihao 已提交
628
  return taosOpenRef(size, func);
dengyihao's avatar
dengyihao 已提交
629
}
dengyihao's avatar
dengyihao 已提交
630
void transCloseRefMgt(int32_t mgt) {
dengyihao's avatar
dengyihao 已提交
631
  // close ref
dengyihao's avatar
dengyihao 已提交
632
  taosCloseRef(mgt);
dengyihao's avatar
dengyihao 已提交
633
}
dengyihao's avatar
dengyihao 已提交
634
int64_t transAddExHandle(int32_t refMgt, void* p) {
dengyihao's avatar
dengyihao 已提交
635
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
636
  return taosAddRef(refMgt, p);
dengyihao's avatar
dengyihao 已提交
637
}
dengyihao's avatar
dengyihao 已提交
638
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
639
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
640
  return taosRemoveRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
641 642
}

dengyihao's avatar
dengyihao 已提交
643
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
644
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
645
  return (void*)taosAcquireRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
646 647
}

dengyihao's avatar
dengyihao 已提交
648
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
649
  // release extern handle
dengyihao's avatar
dengyihao 已提交
650
  return taosReleaseRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
651 652 653 654 655 656 657
}
void transDestoryExHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
658
#endif