transComm.c 17.2 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifdef USE_UV

#include "transComm.h"

dengyihao's avatar
opt rpc  
dengyihao 已提交
19 20
#define BUFFER_CAP 4096

dengyihao's avatar
dengyihao 已提交
21 22 23
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;

static int32_t refMgt;
dengyihao's avatar
dengyihao 已提交
24
static int32_t instMgt;
dengyihao's avatar
dengyihao 已提交
25

dengyihao's avatar
dengyihao 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
  T_MD5_CTX context;
  int       ret = -1;

  tMD5Init(&context);
  tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
  tMD5Update(&context, (uint8_t*)pMsg, msgLen);
  tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
  tMD5Final(&context);

  if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;

  return ret;
}
dengyihao's avatar
dengyihao 已提交
40

dengyihao's avatar
dengyihao 已提交
41 42 43 44 45 46 47 48 49 50 51
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
  T_MD5_CTX context;

  tMD5Init(&context);
  tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
  tMD5Update(&context, (uint8_t*)pMsg, msgLen);
  tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
  tMD5Final(&context);

  memcpy(pAuth, context.digest, sizeof(context.digest));
}
dengyihao's avatar
dengyihao 已提交
52

dengyihao's avatar
dengyihao 已提交
53
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
dengyihao's avatar
dengyihao 已提交
54
  return false;
dengyihao's avatar
dengyihao 已提交
55 56 57 58 59 60 61
  // SRpcHead* pHead = rpcHeadFromCont(pCont);
  bool succ = false;
  int  overhead = sizeof(STransCompMsg);
  if (!NEEDTO_COMPRESSS_MSG(len)) {
    return succ;
  }

wafwerar's avatar
wafwerar 已提交
62
  char* buf = taosMemoryMalloc(len + overhead + 8);  // 8 extra bytes
dengyihao's avatar
dengyihao 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
  if (buf == NULL) {
    tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
    *flen = len;
    return succ;
  }

  int32_t clen = LZ4_compress_default(msg, buf, len, len + overhead);
  tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, overhead);
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
  if (clen > 0 && clen < len - overhead) {
    STransCompMsg* pComp = (STransCompMsg*)msg;
    pComp->reserved = 0;
    pComp->contLen = htonl(len);
    memcpy(msg + overhead, buf, clen);

    tDebug("compress rpc msg, before:%d, after:%d", len, clen);
    *flen = clen + overhead;
    succ = true;
  } else {
    *flen = len;
    succ = false;
  }
wafwerar's avatar
wafwerar 已提交
88
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
89 90 91 92 93 94 95 96 97 98 99 100
  return succ;
}
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) {
  // impl later
  return false;
  STransCompMsg* pComp = (STransCompMsg*)msg;

  int overhead = sizeof(STransCompMsg);
  int clen = 0;
  return false;
}

dengyihao's avatar
dengyihao 已提交
101 102 103 104
void transFreeMsg(void* msg) {
  if (msg == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
105
  taosMemoryFree((char*)msg - sizeof(STransMsgHead));
dengyihao's avatar
dengyihao 已提交
106
}
dengyihao's avatar
dengyihao 已提交
107 108
int transGetSockDebugInfo(struct sockaddr* sockname, char* dst) {
  struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
dengyihao's avatar
dengyihao 已提交
109

dengyihao's avatar
dengyihao 已提交
110 111 112 113 114
  char buf[20] = {0};
  int  r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
  sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
  return r;
}
dengyihao's avatar
dengyihao 已提交
115
int transInitBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
116 117 118 119 120 121 122 123 124
  buf->cap = BUFFER_CAP;
  buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
  buf->left = -1;
  buf->len = 0;
  buf->total = 0;
  return 0;
}
int transDestroyBuffer(SConnBuffer* buf) {
  taosMemoryFree(buf->buf);
dengyihao's avatar
dengyihao 已提交
125 126
  return 0;
}
dengyihao's avatar
opt rpc  
dengyihao 已提交
127

dengyihao's avatar
dengyihao 已提交
128
int transClearBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
  SConnBuffer* p = buf;
  if (p->cap > BUFFER_CAP) {
    p->cap = BUFFER_CAP;
    p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
  }
  p->left = -1;
  p->len = 0;
  p->total = 0;
  return 0;
}

int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
  SConnBuffer* p = connBuf;
  if (p->left != 0) {
    return -1;
  }
  int total = connBuf->total;
  *buf = taosMemoryCalloc(1, total);
  memcpy(*buf, p->buf, total);

  transResetBuffer(connBuf);
  return total;
}

int transResetBuffer(SConnBuffer* connBuf) {
  SConnBuffer* p = connBuf;
dengyihao's avatar
dengyihao 已提交
155
  if (p->total < p->len) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
156 157 158 159 160
    int left = p->len - p->total;
    memmove(p->buf, p->buf + p->total, left);
    p->left = -1;
    p->total = 0;
    p->len = left;
dengyihao's avatar
dengyihao 已提交
161
  } else if (p->total == p->len) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
162 163 164
    p->left = -1;
    p->total = 0;
    p->len = 0;
dengyihao's avatar
dengyihao 已提交
165 166
  } else {
    assert(0);
dengyihao's avatar
opt rpc  
dengyihao 已提交
167
  }
dengyihao's avatar
dengyihao 已提交
168 169 170 171 172 173
  return 0;
}
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
  /*
   * formate of data buffer:
   * |<--------------------------data from socket------------------------------->|
dengyihao's avatar
dengyihao 已提交
174 175
   * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
   * info--->|
dengyihao's avatar
dengyihao 已提交
176 177
   */
  SConnBuffer* p = connBuf;
dengyihao's avatar
dengyihao 已提交
178

dengyihao's avatar
opt rpc  
dengyihao 已提交
179 180
  uvBuf->base = p->buf + p->len;
  if (p->left == -1) {
dengyihao's avatar
dengyihao 已提交
181
    uvBuf->len = p->cap - p->len;
dengyihao's avatar
opt rpc  
dengyihao 已提交
182 183 184 185 186 187 188 189
  } else {
    if (p->left < p->cap - p->len) {
      uvBuf->len = p->left;
    } else {
      p->buf = taosMemoryRealloc(p->buf, p->left + p->len);
      uvBuf->base = p->buf + p->len;
      uvBuf->len = p->left;
    }
dengyihao's avatar
dengyihao 已提交
190 191 192
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
193 194
// check whether already read complete
bool transReadComplete(SConnBuffer* connBuf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
195 196 197 198 199 200 201 202 203 204 205 206 207
  SConnBuffer* p = connBuf;
  if (p->len >= sizeof(STransMsgHead)) {
    if (p->left == -1) {
      STransMsgHead head;
      memcpy((char*)&head, connBuf->buf, sizeof(head));
      int32_t msgLen = (int32_t)htonl(head.msgLen);
      p->total = msgLen;
    }
    if (p->total >= p->len) {
      p->left = p->total - p->len;
    } else {
      p->left = 0;
    }
dengyihao's avatar
dengyihao 已提交
208
  }
dengyihao's avatar
opt rpc  
dengyihao 已提交
209
  return p->left == 0 ? true : false;
dengyihao's avatar
dengyihao 已提交
210
}
dengyihao's avatar
dengyihao 已提交
211

dengyihao's avatar
dengyihao 已提交
212 213 214 215 216 217
int transSetConnOption(uv_tcp_t* stream) {
  uv_tcp_nodelay(stream, 1);
  int ret = uv_tcp_keepalive(stream, 5, 5);
  return ret;
}

dengyihao's avatar
dengyihao 已提交
218
SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
wafwerar's avatar
wafwerar 已提交
219
  SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
dengyihao's avatar
dengyihao 已提交
220
  pool->nAsync = sz;
wafwerar's avatar
wafwerar 已提交
221
  pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
dengyihao's avatar
dengyihao 已提交
222 223 224 225

  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    uv_async_init(loop, async, cb);
dengyihao's avatar
dengyihao 已提交
226

wafwerar's avatar
wafwerar 已提交
227
    SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
dengyihao's avatar
dengyihao 已提交
228 229
    item->pThrd = arg;
    QUEUE_INIT(&item->qmsg);
wafwerar's avatar
wafwerar 已提交
230
    taosThreadMutexInit(&item->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
231 232

    async->data = item;
dengyihao's avatar
dengyihao 已提交
233 234 235
  }
  return pool;
}
dengyihao's avatar
dengyihao 已提交
236

dengyihao's avatar
dengyihao 已提交
237
void transAsyncPoolDestroy(SAsyncPool* pool) {
dengyihao's avatar
dengyihao 已提交
238 239
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
dengyihao's avatar
dengyihao 已提交
240
    // uv_close((uv_handle_t*)async, NULL);
dengyihao's avatar
dengyihao 已提交
241
    SAsyncItem* item = async->data;
wafwerar's avatar
wafwerar 已提交
242
    taosThreadMutexDestroy(&item->mtx);
wafwerar's avatar
wafwerar 已提交
243
    taosMemoryFree(item);
dengyihao's avatar
dengyihao 已提交
244
  }
wafwerar's avatar
wafwerar 已提交
245 246
  taosMemoryFree(pool->asyncs);
  taosMemoryFree(pool);
dengyihao's avatar
dengyihao 已提交
247
}
dengyihao's avatar
dengyihao 已提交
248 249 250 251 252 253 254 255
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;
    if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
256
int transAsyncSend(SAsyncPool* pool, queue* q) {
dengyihao's avatar
dengyihao 已提交
257 258 259
  if (atomic_load_8(&pool->stop) == 1) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
260 261 262 263 264 265
  int idx = pool->index;
  idx = idx % pool->nAsync;
  // no need mutex here
  if (pool->index++ > pool->nAsync) {
    pool->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
266 267 268 269
  uv_async_t* async = &(pool->asyncs[idx]);
  SAsyncItem* item = async->data;

  int64_t st = taosGetTimestampUs();
wafwerar's avatar
wafwerar 已提交
270
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
271
  QUEUE_PUSH(&item->qmsg, q);
wafwerar's avatar
wafwerar 已提交
272
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
273 274
  int64_t el = taosGetTimestampUs() - st;
  if (el > 50) {
S
Shengliang Guan 已提交
275
    // tInfo("lock and unlock cost:%d", (int)el);
dengyihao's avatar
dengyihao 已提交
276 277
  }
  return uv_async_send(async);
dengyihao's avatar
dengyihao 已提交
278
}
dengyihao's avatar
dengyihao 已提交
279

dengyihao's avatar
dengyihao 已提交
280 281 282 283
void transCtxInit(STransCtx* ctx) {
  // init transCtx
  ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
}
dengyihao's avatar
dengyihao 已提交
284
void transCtxCleanup(STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
285 286 287 288 289 290
  if (ctx->args == NULL) {
    return;
  }

  STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
  while (iter) {
dengyihao's avatar
dengyihao 已提交
291
    ctx->freeFunc(iter->val);
dengyihao's avatar
dengyihao 已提交
292 293
    iter = taosHashIterate(ctx->args, iter);
  }
dengyihao's avatar
dengyihao 已提交
294
  ctx->freeFunc(ctx->brokenVal.val);
dengyihao's avatar
dengyihao 已提交
295
  taosHashCleanup(ctx->args);
U
ubuntu 已提交
296
  ctx->args = NULL;
dengyihao's avatar
dengyihao 已提交
297 298 299 300 301
}

void transCtxMerge(STransCtx* dst, STransCtx* src) {
  if (dst->args == NULL) {
    dst->args = src->args;
dengyihao's avatar
dengyihao 已提交
302
    dst->brokenVal = src->brokenVal;
dengyihao's avatar
dengyihao 已提交
303
    dst->freeFunc = src->freeFunc;
dengyihao's avatar
dengyihao 已提交
304 305 306 307 308 309 310 311 312 313 314 315
    src->args = NULL;
    return;
  }
  void*  key = NULL;
  size_t klen = 0;
  void*  iter = taosHashIterate(src->args, NULL);
  while (iter) {
    STransCtxVal* sVal = (STransCtxVal*)iter;
    key = taosHashGetKey(sVal, &klen);

    STransCtxVal* dVal = taosHashGet(dst->args, key, klen);
    if (dVal) {
dengyihao's avatar
dengyihao 已提交
316
      dst->freeFunc(dVal->val);
dengyihao's avatar
dengyihao 已提交
317 318 319 320 321 322 323 324 325 326 327 328 329 330
    }
    taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
    iter = taosHashIterate(src->args, iter);
  }
  taosHashCleanup(src->args);
}
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
  if (ctx->args == NULL) {
    return NULL;
  }
  STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
  if (cVal == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
331
  void* ret = NULL;
D
dapan1121 已提交
332 333
  (*cVal->clone)(cVal->val, &ret);
  return ret;
dengyihao's avatar
dengyihao 已提交
334
}
dengyihao's avatar
dengyihao 已提交
335
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
dengyihao's avatar
dengyihao 已提交
336
  void* ret = NULL;
dengyihao's avatar
dengyihao 已提交
337
  if (ctx->brokenVal.clone == NULL) {
dengyihao's avatar
dengyihao 已提交
338
    return ret;
dengyihao's avatar
dengyihao 已提交
339
  }
D
dapan1121 已提交
340
  (*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret);
dengyihao's avatar
dengyihao 已提交
341 342 343

  *msgType = ctx->brokenVal.msgType;

D
dapan1121 已提交
344
  return ret;
dengyihao's avatar
dengyihao 已提交
345
}
dengyihao's avatar
dengyihao 已提交
346

dengyihao's avatar
dengyihao 已提交
347
void transReqQueueInit(queue* q) {
dengyihao's avatar
dengyihao 已提交
348
  // init req queue
dengyihao's avatar
dengyihao 已提交
349 350
  QUEUE_INIT(q);
}
dengyihao's avatar
dengyihao 已提交
351
void* transReqQueuePush(queue* q) {
dengyihao's avatar
dengyihao 已提交
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
  uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
  STransReq*  wreq = taosMemoryCalloc(1, sizeof(STransReq));
  wreq->data = req;
  req->data = wreq;
  QUEUE_PUSH(q, &wreq->q);
  return req;
}
void* transReqQueueRemove(void* arg) {
  void*       ret = NULL;
  uv_write_t* req = arg;
  STransReq*  wreq = req && req->data ? req->data : NULL;

  assert(wreq->data == req);
  if (wreq == NULL || wreq->data == NULL) {
    taosMemoryFree(wreq->data);
    taosMemoryFree(wreq);
    return req;
  }

  QUEUE_REMOVE(&wreq->q);

  ret = req && req->handle ? req->handle->data : NULL;
  taosMemoryFree(wreq->data);
  taosMemoryFree(wreq);

  return ret;
}
void transReqQueueClear(queue* q) {
  while (!QUEUE_IS_EMPTY(q)) {
    queue* h = QUEUE_HEAD(q);
    QUEUE_REMOVE(h);
    STransReq* req = QUEUE_DATA(h, STransReq, q);
    taosMemoryFree(req->data);
    taosMemoryFree(req);
  }
}

wafwerar's avatar
wafwerar 已提交
389
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
dengyihao's avatar
dengyihao 已提交
390
  queue->q = taosArrayInit(2, sizeof(void*));
wafwerar's avatar
wafwerar 已提交
391
  queue->freeFunc = (void (*)(const void*))freeFunc;
dengyihao's avatar
dengyihao 已提交
392 393
}
bool transQueuePush(STransQueue* queue, void* arg) {
dengyihao's avatar
dengyihao 已提交
394 395 396
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
397 398 399 400 401 402 403
  taosArrayPush(queue->q, &arg);
  if (taosArrayGetSize(queue->q) > 1) {
    return false;
  }
  return true;
}
void* transQueuePop(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
404
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
405 406 407 408 409 410
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, 0);
  taosArrayRemove(queue->q, 0);
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
411
int32_t transQueueSize(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
412 413 414
  if (queue->q == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
415 416 417
  return taosArrayGetSize(queue->q);
}
void* transQueueGet(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
418
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
419 420 421 422 423
    return NULL;
  }
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
424

dengyihao's avatar
dengyihao 已提交
425 426 427 428 429
  void* ptr = taosArrayGetP(queue->q, i);
  return ptr;
}

void* transQueueRm(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
430
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
431 432
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
433 434 435 436 437
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, i);
  taosArrayRemove(queue->q, i);
dengyihao's avatar
dengyihao 已提交
438 439
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
440

dengyihao's avatar
dengyihao 已提交
441
bool transQueueEmpty(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
442 443 444
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
445 446 447
  return taosArrayGetSize(queue->q) == 0;
}
void transQueueClear(STransQueue* queue) {
wafwerar's avatar
wafwerar 已提交
448
  if (queue->freeFunc != NULL) {
dengyihao's avatar
dengyihao 已提交
449 450
    for (int i = 0; i < taosArrayGetSize(queue->q); i++) {
      void* p = taosArrayGetP(queue->q, i);
wafwerar's avatar
wafwerar 已提交
451
      queue->freeFunc(p);
dengyihao's avatar
dengyihao 已提交
452 453 454 455 456 457 458 459
    }
  }
  taosArrayClear(queue->q);
}
void transQueueDestroy(STransQueue* queue) {
  transQueueClear(queue);
  taosArrayDestroy(queue->q);
}
dengyihao's avatar
dengyihao 已提交
460 461 462 463 464

static int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
  SDelayTask* arg1 = container_of(a, SDelayTask, node);
  SDelayTask* arg2 = container_of(b, SDelayTask, node);
  if (arg1->execTime > arg2->execTime) {
dengyihao's avatar
dengyihao 已提交
465
    return 0;
dengyihao's avatar
dengyihao 已提交
466 467 468 469 470
  } else {
    return 1;
  }
}

dengyihao's avatar
dengyihao 已提交
471
static void transDQTimeout(uv_timer_t* timer) {
dengyihao's avatar
dengyihao 已提交
472
  SDelayQueue* queue = timer->data;
dengyihao's avatar
dengyihao 已提交
473 474
  tTrace("timer %p timeout", timer);
  uint64_t timeout = 0;
dengyihao's avatar
dengyihao 已提交
475
  int64_t  current = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
476 477 478 479
  do {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) break;
    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
480
    if (task->execTime <= current) {
dengyihao's avatar
dengyihao 已提交
481 482 483 484 485
      heapRemove(queue->heap, minNode);
      task->func(task->arg);
      taosMemoryFree(task);
      timeout = 0;
    } else {
dengyihao's avatar
dengyihao 已提交
486
      timeout = task->execTime - current;
dengyihao's avatar
dengyihao 已提交
487 488 489 490
      break;
    }
  } while (1);
  if (timeout != 0) {
dengyihao's avatar
dengyihao 已提交
491
    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
dengyihao's avatar
dengyihao 已提交
492 493
  }
}
dengyihao's avatar
dengyihao 已提交
494
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
dengyihao's avatar
dengyihao 已提交
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
  uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
  uv_timer_init(loop, timer);

  Heap* heap = heapCreate(timeCompare);

  SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue));
  q->heap = heap;
  q->timer = timer;
  q->loop = loop;
  q->timer->data = q;

  *queue = q;
  return 0;
}

dengyihao's avatar
dengyihao 已提交
510
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
dengyihao's avatar
dengyihao 已提交
511
  taosMemoryFree(queue->timer);
dengyihao's avatar
dengyihao 已提交
512 513 514 515 516 517 518 519 520

  while (heapSize(queue->heap) > 0) {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) {
      return;
    }
    heapRemove(queue->heap, minNode);

    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
521 522

    STaskArg* arg = task->arg;
dengyihao's avatar
dengyihao 已提交
523
    if (freeFunc) freeFunc(arg->param1);
dengyihao's avatar
dengyihao 已提交
524 525
    taosMemoryFree(arg);

dengyihao's avatar
dengyihao 已提交
526 527
    taosMemoryFree(task);
  }
dengyihao's avatar
dengyihao 已提交
528 529 530
  heapDestroy(queue->heap);
  taosMemoryFree(queue);
}
dengyihao's avatar
dengyihao 已提交
531 532
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
  uv_timer_stop(queue->timer);
dengyihao's avatar
dengyihao 已提交
533

dengyihao's avatar
dengyihao 已提交
534 535 536 537 538
  if (heapSize(queue->heap) <= 0) {
    taosMemoryFree(task->arg);
    taosMemoryFree(task);
    return;
  }
dengyihao's avatar
dengyihao 已提交
539 540
  heapRemove(queue->heap, &task->node);

dengyihao's avatar
dengyihao 已提交
541 542 543
  taosMemoryFree(task->arg);
  taosMemoryFree(task);

dengyihao's avatar
dengyihao 已提交
544 545 546 547 548 549 550 551 552 553 554 555 556
  if (heapSize(queue->heap) != 0) {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode != NULL) return;

    uint64_t    now = taosGetTimestampMs();
    SDelayTask* task = container_of(minNode, SDelayTask, node);
    uint64_t    timeout = now > task->execTime ? now - task->execTime : 0;

    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
  }
}

SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
dengyihao's avatar
dengyihao 已提交
557
  uint64_t    now = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
558 559 560
  SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
  task->func = func;
  task->arg = arg;
dengyihao's avatar
dengyihao 已提交
561 562 563 564 565 566
  task->execTime = now + timeoutMs;

  HeapNode* minNode = heapMin(queue->heap);
  if (minNode) {
    SDelayTask* minTask = container_of(minNode, SDelayTask, node);
    if (minTask->execTime < task->execTime) {
dengyihao's avatar
dengyihao 已提交
567
      timeoutMs = minTask->execTime <= now ? 0 : minTask->execTime - now;
dengyihao's avatar
dengyihao 已提交
568 569
    }
  }
dengyihao's avatar
dengyihao 已提交
570

S
Shengliang Guan 已提交
571
  tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs);
dengyihao's avatar
dengyihao 已提交
572
  heapInsert(queue->heap, &task->node);
dengyihao's avatar
dengyihao 已提交
573
  uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
dengyihao's avatar
dengyihao 已提交
574
  return task;
dengyihao's avatar
dengyihao 已提交
575
}
dengyihao's avatar
dengyihao 已提交
576 577 578 579 580 581

void transPrintEpSet(SEpSet* pEpSet) {
  if (pEpSet == NULL) {
    tTrace("NULL epset");
    return;
  }
dengyihao's avatar
dengyihao 已提交
582
  char buf[512] = {0};
dengyihao's avatar
dengyihao 已提交
583
  int  len = snprintf(buf, sizeof(buf), "epset:{");
dengyihao's avatar
dengyihao 已提交
584
  for (int i = 0; i < pEpSet->numOfEps; i++) {
dengyihao's avatar
dengyihao 已提交
585
    if (i == pEpSet->numOfEps - 1) {
dengyihao's avatar
dengyihao 已提交
586
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
dengyihao's avatar
dengyihao 已提交
587 588 589
    } else {
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
    }
dengyihao's avatar
dengyihao 已提交
590
  }
dengyihao's avatar
dengyihao 已提交
591
  len += snprintf(buf + len, sizeof(buf) - len, "}");
dengyihao's avatar
dengyihao 已提交
592
  tTrace("%s, inUse:%d", buf, pEpSet->inUse);
dengyihao's avatar
dengyihao 已提交
593
}
594 595 596 597 598 599 600 601 602 603 604
bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
  if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
    return false;
  }
  for (int i = 0; i < a->numOfEps; i++) {
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
      return false;
    }
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
605

dengyihao's avatar
dengyihao 已提交
606
static void transInitEnv() {
dengyihao's avatar
dengyihao 已提交
607 608
  refMgt = transOpenRefMgt(50000, transDestoryExHandle);
  instMgt = taosOpenRef(50, rpcCloseImpl);
dengyihao's avatar
dengyihao 已提交
609 610
  uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
}
dengyihao's avatar
dengyihao 已提交
611
static void transDestroyEnv() {
dengyihao's avatar
dengyihao 已提交
612 613
  transCloseRefMgt(refMgt);
  transCloseRefMgt(instMgt);
dengyihao's avatar
dengyihao 已提交
614
}
dengyihao's avatar
dengyihao 已提交
615

dengyihao's avatar
dengyihao 已提交
616 617 618 619
void transInit() {
  // init env
  taosThreadOnce(&transModuleInit, transInitEnv);
}
dengyihao's avatar
dengyihao 已提交
620 621 622 623

int32_t transGetRefMgt() { return refMgt; }
int32_t transGetInstMgt() { return instMgt; }

dengyihao's avatar
dengyihao 已提交
624 625 626 627
void transCleanup() {
  // clean env
  transDestroyEnv();
}
dengyihao's avatar
dengyihao 已提交
628
int32_t transOpenRefMgt(int size, void (*func)(void*)) {
dengyihao's avatar
dengyihao 已提交
629
  // added into once later
dengyihao's avatar
dengyihao 已提交
630
  return taosOpenRef(size, func);
dengyihao's avatar
dengyihao 已提交
631
}
dengyihao's avatar
dengyihao 已提交
632
void transCloseRefMgt(int32_t mgt) {
dengyihao's avatar
dengyihao 已提交
633
  // close ref
dengyihao's avatar
dengyihao 已提交
634
  taosCloseRef(mgt);
dengyihao's avatar
dengyihao 已提交
635
}
dengyihao's avatar
dengyihao 已提交
636
int64_t transAddExHandle(int32_t refMgt, void* p) {
dengyihao's avatar
dengyihao 已提交
637
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
638
  return taosAddRef(refMgt, p);
dengyihao's avatar
dengyihao 已提交
639
}
dengyihao's avatar
dengyihao 已提交
640
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
641
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
642
  return taosRemoveRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
643 644
}

dengyihao's avatar
dengyihao 已提交
645
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
646
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
647
  return (void*)taosAcquireRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
648 649
}

dengyihao's avatar
dengyihao 已提交
650
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
651
  // release extern handle
dengyihao's avatar
dengyihao 已提交
652
  return taosReleaseRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
653 654 655 656 657 658 659
}
void transDestoryExHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
660
#endif