transComm.c 14.1 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifdef USE_UV

#include "transComm.h"

dengyihao's avatar
dengyihao 已提交
19 20 21
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;

static int32_t refMgt;
dengyihao's avatar
dengyihao 已提交
22

dengyihao's avatar
dengyihao 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
  T_MD5_CTX context;
  int       ret = -1;

  tMD5Init(&context);
  tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
  tMD5Update(&context, (uint8_t*)pMsg, msgLen);
  tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
  tMD5Final(&context);

  if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;

  return ret;
}
dengyihao's avatar
dengyihao 已提交
37

dengyihao's avatar
dengyihao 已提交
38 39 40 41 42 43 44 45 46 47 48
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
  T_MD5_CTX context;

  tMD5Init(&context);
  tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
  tMD5Update(&context, (uint8_t*)pMsg, msgLen);
  tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
  tMD5Final(&context);

  memcpy(pAuth, context.digest, sizeof(context.digest));
}
dengyihao's avatar
dengyihao 已提交
49

dengyihao's avatar
dengyihao 已提交
50
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
dengyihao's avatar
dengyihao 已提交
51
  return false;
dengyihao's avatar
dengyihao 已提交
52 53 54 55 56 57 58
  // SRpcHead* pHead = rpcHeadFromCont(pCont);
  bool succ = false;
  int  overhead = sizeof(STransCompMsg);
  if (!NEEDTO_COMPRESSS_MSG(len)) {
    return succ;
  }

wafwerar's avatar
wafwerar 已提交
59
  char* buf = taosMemoryMalloc(len + overhead + 8);  // 8 extra bytes
dengyihao's avatar
dengyihao 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
  if (buf == NULL) {
    tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
    *flen = len;
    return succ;
  }

  int32_t clen = LZ4_compress_default(msg, buf, len, len + overhead);
  tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, overhead);
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
  if (clen > 0 && clen < len - overhead) {
    STransCompMsg* pComp = (STransCompMsg*)msg;
    pComp->reserved = 0;
    pComp->contLen = htonl(len);
    memcpy(msg + overhead, buf, clen);

    tDebug("compress rpc msg, before:%d, after:%d", len, clen);
    *flen = clen + overhead;
    succ = true;
  } else {
    *flen = len;
    succ = false;
  }
wafwerar's avatar
wafwerar 已提交
85
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
86 87 88 89 90 91 92 93 94 95 96 97
  return succ;
}
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) {
  // impl later
  return false;
  STransCompMsg* pComp = (STransCompMsg*)msg;

  int overhead = sizeof(STransCompMsg);
  int clen = 0;
  return false;
}

dengyihao's avatar
dengyihao 已提交
98 99 100 101
void transFreeMsg(void* msg) {
  if (msg == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
102
  taosMemoryFree((char*)msg - sizeof(STransMsgHead));
dengyihao's avatar
dengyihao 已提交
103
}
dengyihao's avatar
dengyihao 已提交
104 105 106 107 108 109 110

int transInitBuffer(SConnBuffer* buf) {
  transClearBuffer(buf);
  return 0;
}
int transClearBuffer(SConnBuffer* buf) {
  memset(buf, 0, sizeof(*buf));
dengyihao's avatar
fix bug  
dengyihao 已提交
111
  buf->total = -1;
dengyihao's avatar
dengyihao 已提交
112 113 114 115 116 117
  return 0;
}
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
  /*
   * formate of data buffer:
   * |<--------------------------data from socket------------------------------->|
dengyihao's avatar
dengyihao 已提交
118 119
   * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
   * info--->|
dengyihao's avatar
dengyihao 已提交
120
   */
dengyihao's avatar
fix bug  
dengyihao 已提交
121
  static const int CAPACITY = sizeof(STransMsgHead);
dengyihao's avatar
dengyihao 已提交
122 123 124

  SConnBuffer* p = connBuf;
  if (p->cap == 0) {
wafwerar's avatar
wafwerar 已提交
125
    p->buf = (char*)taosMemoryCalloc(CAPACITY, sizeof(char));
dengyihao's avatar
dengyihao 已提交
126 127
    p->len = 0;
    p->cap = CAPACITY;
dengyihao's avatar
dengyihao 已提交
128
    p->total = -1;
dengyihao's avatar
dengyihao 已提交
129 130 131

    uvBuf->base = p->buf;
    uvBuf->len = CAPACITY;
dengyihao's avatar
dengyihao 已提交
132 133 134
  } else if (p->total == -1 && p->len < CAPACITY) {
    uvBuf->base = p->buf + p->len;
    uvBuf->len = CAPACITY - p->len;
dengyihao's avatar
dengyihao 已提交
135
  } else {
dengyihao's avatar
dengyihao 已提交
136
    p->cap = p->total;
wafwerar's avatar
wafwerar 已提交
137
    p->buf = taosMemoryRealloc(p->buf, p->cap);
dengyihao's avatar
dengyihao 已提交
138
    tTrace("internal malloc mem: %p, size: %d", p->buf, p->cap);
dengyihao's avatar
dengyihao 已提交
139

dengyihao's avatar
dengyihao 已提交
140 141 142 143 144
    uvBuf->base = p->buf + p->len;
    uvBuf->len = p->cap - p->len;
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157
// check whether already read complete
bool transReadComplete(SConnBuffer* connBuf) {
  if (connBuf->total == -1 && connBuf->len >= sizeof(STransMsgHead)) {
    STransMsgHead head;
    memcpy((char*)&head, connBuf->buf, sizeof(head));
    int32_t msgLen = (int32_t)htonl(head.msgLen);
    connBuf->total = msgLen;
  }
  if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
    return true;
  }
  return false;
}
dengyihao's avatar
dengyihao 已提交
158
int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) { return 0; }
dengyihao's avatar
dengyihao 已提交
159

dengyihao's avatar
dengyihao 已提交
160
int transUnpackMsg(STransMsgHead* msgHead) { return 0; }
dengyihao's avatar
dengyihao 已提交
161 162
int transDestroyBuffer(SConnBuffer* buf) {
  if (buf->cap > 0) {
wafwerar's avatar
wafwerar 已提交
163
    taosMemoryFreeClear(buf->buf);
dengyihao's avatar
dengyihao 已提交
164 165
  }
  transClearBuffer(buf);
166 167

  return 0;
dengyihao's avatar
dengyihao 已提交
168
}
dengyihao's avatar
dengyihao 已提交
169

dengyihao's avatar
dengyihao 已提交
170 171 172 173 174 175
int transSetConnOption(uv_tcp_t* stream) {
  uv_tcp_nodelay(stream, 1);
  int ret = uv_tcp_keepalive(stream, 5, 5);
  return ret;
}

dengyihao's avatar
dengyihao 已提交
176
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
wafwerar's avatar
wafwerar 已提交
177
  SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
dengyihao's avatar
dengyihao 已提交
178 179
  pool->index = 0;
  pool->nAsync = sz;
wafwerar's avatar
wafwerar 已提交
180
  pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
dengyihao's avatar
dengyihao 已提交
181 182 183 184

  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    uv_async_init(loop, async, cb);
dengyihao's avatar
dengyihao 已提交
185

wafwerar's avatar
wafwerar 已提交
186
    SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
dengyihao's avatar
dengyihao 已提交
187 188
    item->pThrd = arg;
    QUEUE_INIT(&item->qmsg);
wafwerar's avatar
wafwerar 已提交
189
    taosThreadMutexInit(&item->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
190 191

    async->data = item;
dengyihao's avatar
dengyihao 已提交
192 193 194
  }
  return pool;
}
dengyihao's avatar
dengyihao 已提交
195

dengyihao's avatar
dengyihao 已提交
196 197 198
void transDestroyAsyncPool(SAsyncPool* pool) {
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
dengyihao's avatar
dengyihao 已提交
199
    // uv_close((uv_handle_t*)async, NULL);
dengyihao's avatar
dengyihao 已提交
200
    SAsyncItem* item = async->data;
wafwerar's avatar
wafwerar 已提交
201
    taosThreadMutexDestroy(&item->mtx);
wafwerar's avatar
wafwerar 已提交
202
    taosMemoryFree(item);
dengyihao's avatar
dengyihao 已提交
203
  }
wafwerar's avatar
wafwerar 已提交
204 205
  taosMemoryFree(pool->asyncs);
  taosMemoryFree(pool);
dengyihao's avatar
dengyihao 已提交
206
}
dengyihao's avatar
dengyihao 已提交
207
int transAsyncSend(SAsyncPool* pool, queue* q) {
dengyihao's avatar
dengyihao 已提交
208 209 210 211 212 213
  int idx = pool->index;
  idx = idx % pool->nAsync;
  // no need mutex here
  if (pool->index++ > pool->nAsync) {
    pool->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
214 215 216 217
  uv_async_t* async = &(pool->asyncs[idx]);
  SAsyncItem* item = async->data;

  int64_t st = taosGetTimestampUs();
wafwerar's avatar
wafwerar 已提交
218
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
219
  QUEUE_PUSH(&item->qmsg, q);
wafwerar's avatar
wafwerar 已提交
220
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
221 222 223 224 225
  int64_t el = taosGetTimestampUs() - st;
  if (el > 50) {
    // tInfo("lock and unlock cost: %d", (int)el);
  }
  return uv_async_send(async);
dengyihao's avatar
dengyihao 已提交
226
}
dengyihao's avatar
dengyihao 已提交
227

dengyihao's avatar
dengyihao 已提交
228 229 230 231
void transCtxInit(STransCtx* ctx) {
  // init transCtx
  ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
}
dengyihao's avatar
dengyihao 已提交
232
void transCtxCleanup(STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
233 234 235 236 237 238
  if (ctx->args == NULL) {
    return;
  }

  STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
  while (iter) {
dengyihao's avatar
dengyihao 已提交
239
    ctx->freeFunc(iter->val);
dengyihao's avatar
dengyihao 已提交
240 241
    iter = taosHashIterate(ctx->args, iter);
  }
dengyihao's avatar
dengyihao 已提交
242

dengyihao's avatar
dengyihao 已提交
243
  taosHashCleanup(ctx->args);
U
ubuntu 已提交
244
  ctx->args = NULL;
dengyihao's avatar
dengyihao 已提交
245 246 247 248 249
}

void transCtxMerge(STransCtx* dst, STransCtx* src) {
  if (dst->args == NULL) {
    dst->args = src->args;
dengyihao's avatar
dengyihao 已提交
250
    dst->brokenVal = src->brokenVal;
dengyihao's avatar
dengyihao 已提交
251
    dst->freeFunc = src->freeFunc;
dengyihao's avatar
dengyihao 已提交
252 253 254 255 256 257 258 259 260 261 262 263
    src->args = NULL;
    return;
  }
  void*  key = NULL;
  size_t klen = 0;
  void*  iter = taosHashIterate(src->args, NULL);
  while (iter) {
    STransCtxVal* sVal = (STransCtxVal*)iter;
    key = taosHashGetKey(sVal, &klen);

    STransCtxVal* dVal = taosHashGet(dst->args, key, klen);
    if (dVal) {
dengyihao's avatar
dengyihao 已提交
264
      dst->freeFunc(dVal->val);
dengyihao's avatar
dengyihao 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277 278
    }
    taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
    iter = taosHashIterate(src->args, iter);
  }
  taosHashCleanup(src->args);
}
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
  if (ctx->args == NULL) {
    return NULL;
  }
  STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
  if (cVal == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
279
  void* ret = NULL;
D
dapan1121 已提交
280 281
  (*cVal->clone)(cVal->val, &ret);
  return ret;
dengyihao's avatar
dengyihao 已提交
282
}
dengyihao's avatar
dengyihao 已提交
283
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
dengyihao's avatar
dengyihao 已提交
284
  void* ret = NULL;
dengyihao's avatar
dengyihao 已提交
285
  if (ctx->brokenVal.clone == NULL) {
dengyihao's avatar
dengyihao 已提交
286
    return ret;
dengyihao's avatar
dengyihao 已提交
287
  }
D
dapan1121 已提交
288
  (*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret);
dengyihao's avatar
dengyihao 已提交
289 290 291

  *msgType = ctx->brokenVal.msgType;

D
dapan1121 已提交
292
  return ret;
dengyihao's avatar
dengyihao 已提交
293
}
dengyihao's avatar
dengyihao 已提交
294

wafwerar's avatar
wafwerar 已提交
295
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
dengyihao's avatar
dengyihao 已提交
296
  queue->q = taosArrayInit(2, sizeof(void*));
wafwerar's avatar
wafwerar 已提交
297
  queue->freeFunc = (void (*)(const void*))freeFunc;
dengyihao's avatar
dengyihao 已提交
298 299
}
bool transQueuePush(STransQueue* queue, void* arg) {
dengyihao's avatar
dengyihao 已提交
300 301 302
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
303 304 305 306 307 308 309
  taosArrayPush(queue->q, &arg);
  if (taosArrayGetSize(queue->q) > 1) {
    return false;
  }
  return true;
}
void* transQueuePop(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
310
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
311 312 313 314 315 316
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, 0);
  taosArrayRemove(queue->q, 0);
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
317
int32_t transQueueSize(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
318 319 320
  if (queue->q == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
321 322 323
  return taosArrayGetSize(queue->q);
}
void* transQueueGet(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
324
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
325 326 327 328 329
    return NULL;
  }
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
330

dengyihao's avatar
dengyihao 已提交
331 332 333 334 335
  void* ptr = taosArrayGetP(queue->q, i);
  return ptr;
}

void* transQueueRm(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
336
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
337 338
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
339 340 341 342 343
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, i);
  taosArrayRemove(queue->q, i);
dengyihao's avatar
dengyihao 已提交
344 345
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
346

dengyihao's avatar
dengyihao 已提交
347
bool transQueueEmpty(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
348 349 350
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
351 352 353
  return taosArrayGetSize(queue->q) == 0;
}
void transQueueClear(STransQueue* queue) {
wafwerar's avatar
wafwerar 已提交
354
  if (queue->freeFunc != NULL) {
dengyihao's avatar
dengyihao 已提交
355 356
    for (int i = 0; i < taosArrayGetSize(queue->q); i++) {
      void* p = taosArrayGetP(queue->q, i);
wafwerar's avatar
wafwerar 已提交
357
      queue->freeFunc(p);
dengyihao's avatar
dengyihao 已提交
358 359 360 361 362 363 364 365
    }
  }
  taosArrayClear(queue->q);
}
void transQueueDestroy(STransQueue* queue) {
  transQueueClear(queue);
  taosArrayDestroy(queue->q);
}
dengyihao's avatar
dengyihao 已提交
366 367 368 369 370

static int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
  SDelayTask* arg1 = container_of(a, SDelayTask, node);
  SDelayTask* arg2 = container_of(b, SDelayTask, node);
  if (arg1->execTime > arg2->execTime) {
dengyihao's avatar
dengyihao 已提交
371
    return 0;
dengyihao's avatar
dengyihao 已提交
372 373 374 375 376
  } else {
    return 1;
  }
}

dengyihao's avatar
dengyihao 已提交
377
static void transDQTimeout(uv_timer_t* timer) {
dengyihao's avatar
dengyihao 已提交
378
  SDelayQueue* queue = timer->data;
dengyihao's avatar
dengyihao 已提交
379 380
  tTrace("timer %p timeout", timer);
  uint64_t timeout = 0;
dengyihao's avatar
dengyihao 已提交
381
  int64_t  current = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
382 383 384 385
  do {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) break;
    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
386
    if (task->execTime <= current) {
dengyihao's avatar
dengyihao 已提交
387 388 389 390 391
      heapRemove(queue->heap, minNode);
      task->func(task->arg);
      taosMemoryFree(task);
      timeout = 0;
    } else {
dengyihao's avatar
dengyihao 已提交
392
      timeout = task->execTime - current;
dengyihao's avatar
dengyihao 已提交
393 394 395 396
      break;
    }
  } while (1);
  if (timeout != 0) {
dengyihao's avatar
dengyihao 已提交
397
    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
dengyihao's avatar
dengyihao 已提交
398 399
  }
}
dengyihao's avatar
dengyihao 已提交
400
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
dengyihao's avatar
dengyihao 已提交
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
  uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
  uv_timer_init(loop, timer);

  Heap* heap = heapCreate(timeCompare);

  SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue));
  q->heap = heap;
  q->timer = timer;
  q->loop = loop;
  q->timer->data = q;

  *queue = q;
  return 0;
}

dengyihao's avatar
dengyihao 已提交
416
void transDQDestroy(SDelayQueue* queue) {
dengyihao's avatar
dengyihao 已提交
417
  taosMemoryFree(queue->timer);
dengyihao's avatar
dengyihao 已提交
418 419 420 421 422 423 424 425 426 427 428

  while (heapSize(queue->heap) > 0) {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) {
      return;
    }
    heapRemove(queue->heap, minNode);

    SDelayTask* task = container_of(minNode, SDelayTask, node);
    taosMemoryFree(task);
  }
dengyihao's avatar
dengyihao 已提交
429 430 431 432
  heapDestroy(queue->heap);
  taosMemoryFree(queue);
}

dengyihao's avatar
dengyihao 已提交
433
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
dengyihao's avatar
dengyihao 已提交
434
  uint64_t    now = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
435 436 437
  SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
  task->func = func;
  task->arg = arg;
dengyihao's avatar
dengyihao 已提交
438 439 440 441 442 443
  task->execTime = now + timeoutMs;

  HeapNode* minNode = heapMin(queue->heap);
  if (minNode) {
    SDelayTask* minTask = container_of(minNode, SDelayTask, node);
    if (minTask->execTime < task->execTime) {
dengyihao's avatar
dengyihao 已提交
444
      timeoutMs = minTask->execTime <= now ? 0 : minTask->execTime - now;
dengyihao's avatar
dengyihao 已提交
445 446
    }
  }
dengyihao's avatar
dengyihao 已提交
447

dengyihao's avatar
dengyihao 已提交
448
  tTrace("timer %p put task into delay queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs);
dengyihao's avatar
dengyihao 已提交
449
  heapInsert(queue->heap, &task->node);
dengyihao's avatar
dengyihao 已提交
450
  uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
dengyihao's avatar
dengyihao 已提交
451 452
  return 0;
}
dengyihao's avatar
dengyihao 已提交
453 454 455 456 457 458

void transPrintEpSet(SEpSet* pEpSet) {
  if (pEpSet == NULL) {
    tTrace("NULL epset");
    return;
  }
dengyihao's avatar
dengyihao 已提交
459
  char buf[512] = {0};
dengyihao's avatar
dengyihao 已提交
460
  int  len = snprintf(buf, sizeof(buf), "epset:{");
dengyihao's avatar
dengyihao 已提交
461
  for (int i = 0; i < pEpSet->numOfEps; i++) {
dengyihao's avatar
dengyihao 已提交
462
    if (i == pEpSet->numOfEps - 1) {
dengyihao's avatar
dengyihao 已提交
463
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
dengyihao's avatar
dengyihao 已提交
464 465 466
    } else {
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
    }
dengyihao's avatar
dengyihao 已提交
467
  }
dengyihao's avatar
dengyihao 已提交
468
  len += snprintf(buf + len, sizeof(buf) - len, "}");
dengyihao's avatar
dengyihao 已提交
469
  tTrace("%s, inUse:%d", buf, pEpSet->inUse);
dengyihao's avatar
dengyihao 已提交
470
}
471 472 473 474 475 476 477 478 479 480 481
bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
  if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
    return false;
  }
  for (int i = 0; i < a->numOfEps; i++) {
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
      return false;
    }
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
482

dengyihao's avatar
dengyihao 已提交
483 484
static void transInitEnv() {
  refMgt = transOpenExHandleMgt(50000);
dengyihao's avatar
dengyihao 已提交
485 486
  uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
}
dengyihao's avatar
dengyihao 已提交
487 488 489 490 491 492 493 494 495 496 497 498
static void transDestroyEnv() {
  // close ref
  transCloseExHandleMgt(refMgt);
}
void transInit() {
  // init env
  taosThreadOnce(&transModuleInit, transInitEnv);
}
void transCleanup() {
  // clean env
  transDestroyEnv();
}
dengyihao's avatar
dengyihao 已提交
499 500 501 502
int32_t transOpenExHandleMgt(int size) {
  // added into once later
  return taosOpenRef(size, transDestoryExHandle);
}
dengyihao's avatar
dengyihao 已提交
503
void transCloseExHandleMgt() {
dengyihao's avatar
dengyihao 已提交
504
  // close ref
dengyihao's avatar
dengyihao 已提交
505
  taosCloseRef(refMgt);
dengyihao's avatar
dengyihao 已提交
506
}
dengyihao's avatar
dengyihao 已提交
507
int64_t transAddExHandle(void* p) {
dengyihao's avatar
dengyihao 已提交
508
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
509
  return taosAddRef(refMgt, p);
dengyihao's avatar
dengyihao 已提交
510
}
dengyihao's avatar
dengyihao 已提交
511
int32_t transRemoveExHandle(int64_t refId) {
dengyihao's avatar
dengyihao 已提交
512
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
513
  return taosRemoveRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
514 515
}

dengyihao's avatar
dengyihao 已提交
516
SExHandle* transAcquireExHandle(int64_t refId) {
dengyihao's avatar
dengyihao 已提交
517
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
518
  return (SExHandle*)taosAcquireRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
519 520
}

dengyihao's avatar
dengyihao 已提交
521
int32_t transReleaseExHandle(int64_t refId) {
dengyihao's avatar
dengyihao 已提交
522
  // release extern handle
dengyihao's avatar
dengyihao 已提交
523
  return taosReleaseRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
524 525 526 527 528 529 530
}
void transDestoryExHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
531
#endif