transComm.c 16.8 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifdef USE_UV

#include "transComm.h"

dengyihao's avatar
opt rpc  
dengyihao 已提交
19 20
#define BUFFER_CAP 4096

dengyihao's avatar
dengyihao 已提交
21 22 23
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;

static int32_t refMgt;
dengyihao's avatar
dengyihao 已提交
24
static int32_t instMgt;
dengyihao's avatar
dengyihao 已提交
25

dengyihao's avatar
dengyihao 已提交
26 27 28 29
int32_t transCompressMsg(char* msg, int32_t len) {
  int32_t        ret = 0;
  int            compHdr = sizeof(STransCompMsg);
  STransMsgHead* pHead = transHeadFromCont(msg);
dengyihao's avatar
dengyihao 已提交
30

dengyihao's avatar
dengyihao 已提交
31
  char* buf = taosMemoryMalloc(len + compHdr + 8);  // 8 extra bytes
dengyihao's avatar
dengyihao 已提交
32 33
  if (buf == NULL) {
    tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
dengyihao's avatar
dengyihao 已提交
34 35
    ret = len;
    return ret;
dengyihao's avatar
dengyihao 已提交
36 37
  }

dengyihao's avatar
dengyihao 已提交
38
  int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
dengyihao's avatar
dengyihao 已提交
39 40 41 42
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
dengyihao's avatar
dengyihao 已提交
43
  if (clen > 0 && clen < len - compHdr) {
dengyihao's avatar
dengyihao 已提交
44 45 46
    STransCompMsg* pComp = (STransCompMsg*)msg;
    pComp->reserved = 0;
    pComp->contLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
47
    memcpy(msg + compHdr, buf, clen);
dengyihao's avatar
dengyihao 已提交
48 49

    tDebug("compress rpc msg, before:%d, after:%d", len, clen);
dengyihao's avatar
dengyihao 已提交
50 51
    ret = clen + compHdr;
    pHead->comp = 1;
dengyihao's avatar
dengyihao 已提交
52
  } else {
dengyihao's avatar
dengyihao 已提交
53 54
    ret = len;
    pHead->comp = 0;
dengyihao's avatar
dengyihao 已提交
55
  }
wafwerar's avatar
wafwerar 已提交
56
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
57
  return ret;
dengyihao's avatar
dengyihao 已提交
58
}
dengyihao's avatar
dengyihao 已提交
59 60 61 62
int32_t transDecompressMsg(char** msg, int32_t len) {
  STransMsgHead* pHead = (STransMsgHead*)(*msg);
  if (pHead->comp == 0) return 0;

dengyihao's avatar
dengyihao 已提交
63 64
  char* pCont = transContFromHead(pHead);

dengyihao's avatar
dengyihao 已提交
65 66 67 68 69
  STransCompMsg* pComp = (STransCompMsg*)pCont;
  int32_t        oriLen = htonl(pComp->contLen);

  char*          buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
  STransMsgHead* pNewHead = (STransMsgHead*)buf;
dengyihao's avatar
dengyihao 已提交
70 71
  int32_t        decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), pNewHead->content,
                                                 len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
dengyihao's avatar
dengyihao 已提交
72
  memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
dengyihao's avatar
dengyihao 已提交
73

dengyihao's avatar
dengyihao 已提交
74 75 76 77 78 79 80 81
  pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead));

  taosMemoryFree(pHead);
  *msg = buf;
  if (decompLen != oriLen) {
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
82 83
}

dengyihao's avatar
dengyihao 已提交
84 85 86 87
void transFreeMsg(void* msg) {
  if (msg == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
88
  taosMemoryFree((char*)msg - sizeof(STransMsgHead));
dengyihao's avatar
dengyihao 已提交
89
}
dengyihao's avatar
dengyihao 已提交
90
int transSockInfo2Str(struct sockaddr* sockname, char* dst) {
dengyihao's avatar
dengyihao 已提交
91
  struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
dengyihao's avatar
dengyihao 已提交
92

dengyihao's avatar
dengyihao 已提交
93 94 95 96 97
  char buf[20] = {0};
  int  r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
  sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
  return r;
}
dengyihao's avatar
dengyihao 已提交
98
int transInitBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
99 100 101 102 103
  buf->cap = BUFFER_CAP;
  buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
  buf->left = -1;
  buf->len = 0;
  buf->total = 0;
dengyihao's avatar
dengyihao 已提交
104
  buf->invalid = 0;
dengyihao's avatar
opt rpc  
dengyihao 已提交
105 106
  return 0;
}
dengyihao's avatar
dengyihao 已提交
107 108 109
int transDestroyBuffer(SConnBuffer* p) {
  taosMemoryFree(p->buf);
  p->buf = NULL;
dengyihao's avatar
dengyihao 已提交
110 111
  return 0;
}
dengyihao's avatar
opt rpc  
dengyihao 已提交
112

dengyihao's avatar
dengyihao 已提交
113
int transClearBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
114 115 116 117 118 119 120 121
  SConnBuffer* p = buf;
  if (p->cap > BUFFER_CAP) {
    p->cap = BUFFER_CAP;
    p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
  }
  p->left = -1;
  p->len = 0;
  p->total = 0;
dengyihao's avatar
dengyihao 已提交
122
  p->invalid = 0;
dengyihao's avatar
opt rpc  
dengyihao 已提交
123 124 125 126
  return 0;
}

int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
dengyihao's avatar
dengyihao 已提交
127 128
  static const int HEADSIZE = sizeof(STransMsgHead);

dengyihao's avatar
opt rpc  
dengyihao 已提交
129
  SConnBuffer* p = connBuf;
dengyihao's avatar
dengyihao 已提交
130
  if (p->left != 0 || p->total <= 0) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
131 132
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
133
  int total = p->total;
dengyihao's avatar
dengyihao 已提交
134
  if (total >= HEADSIZE && !p->invalid) {
dengyihao's avatar
dengyihao 已提交
135 136 137 138 139 140
    *buf = taosMemoryCalloc(1, total);
    memcpy(*buf, p->buf, total);
    transResetBuffer(connBuf);
  } else {
    total = -1;
  }
dengyihao's avatar
opt rpc  
dengyihao 已提交
141 142 143 144 145
  return total;
}

int transResetBuffer(SConnBuffer* connBuf) {
  SConnBuffer* p = connBuf;
dengyihao's avatar
dengyihao 已提交
146
  if (p->total < p->len) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
147 148 149 150 151
    int left = p->len - p->total;
    memmove(p->buf, p->buf + p->total, left);
    p->left = -1;
    p->total = 0;
    p->len = left;
dengyihao's avatar
dengyihao 已提交
152
  } else if (p->total == p->len) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
153 154 155
    p->left = -1;
    p->total = 0;
    p->len = 0;
dengyihao's avatar
dengyihao 已提交
156 157
  } else {
    assert(0);
dengyihao's avatar
opt rpc  
dengyihao 已提交
158
  }
dengyihao's avatar
dengyihao 已提交
159 160 161 162 163 164
  return 0;
}
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
  /*
   * formate of data buffer:
   * |<--------------------------data from socket------------------------------->|
dengyihao's avatar
dengyihao 已提交
165 166
   * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
   * info--->|
dengyihao's avatar
dengyihao 已提交
167 168
   */
  SConnBuffer* p = connBuf;
dengyihao's avatar
opt rpc  
dengyihao 已提交
169 170
  uvBuf->base = p->buf + p->len;
  if (p->left == -1) {
dengyihao's avatar
dengyihao 已提交
171
    uvBuf->len = p->cap - p->len;
dengyihao's avatar
opt rpc  
dengyihao 已提交
172 173 174 175
  } else {
    if (p->left < p->cap - p->len) {
      uvBuf->len = p->left;
    } else {
dengyihao's avatar
dengyihao 已提交
176 177
      p->cap = p->left + p->len;
      p->buf = taosMemoryRealloc(p->buf, p->cap);
dengyihao's avatar
opt rpc  
dengyihao 已提交
178 179 180
      uvBuf->base = p->buf + p->len;
      uvBuf->len = p->left;
    }
dengyihao's avatar
dengyihao 已提交
181 182 183
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
184 185
// check whether already read complete
bool transReadComplete(SConnBuffer* connBuf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
186 187 188 189 190 191 192
  SConnBuffer* p = connBuf;
  if (p->len >= sizeof(STransMsgHead)) {
    if (p->left == -1) {
      STransMsgHead head;
      memcpy((char*)&head, connBuf->buf, sizeof(head));
      int32_t msgLen = (int32_t)htonl(head.msgLen);
      p->total = msgLen;
dengyihao's avatar
dengyihao 已提交
193
      p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum));
dengyihao's avatar
opt rpc  
dengyihao 已提交
194 195 196 197 198 199
    }
    if (p->total >= p->len) {
      p->left = p->total - p->len;
    } else {
      p->left = 0;
    }
dengyihao's avatar
dengyihao 已提交
200
  }
dengyihao's avatar
dengyihao 已提交
201
  return (p->left == 0 || p->invalid) ? true : false;
dengyihao's avatar
dengyihao 已提交
202
}
dengyihao's avatar
dengyihao 已提交
203

dengyihao's avatar
dengyihao 已提交
204
int transSetConnOption(uv_tcp_t* stream) {
dengyihao's avatar
dengyihao 已提交
205 206
  return uv_tcp_nodelay(stream, 1);
  // int ret = uv_tcp_keepalive(stream, 5, 60);
dengyihao's avatar
dengyihao 已提交
207 208
}

dengyihao's avatar
dengyihao 已提交
209
SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
wafwerar's avatar
wafwerar 已提交
210
  SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
dengyihao's avatar
dengyihao 已提交
211
  pool->nAsync = sz;
wafwerar's avatar
wafwerar 已提交
212
  pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
dengyihao's avatar
dengyihao 已提交
213 214

  for (int i = 0; i < pool->nAsync; i++) {
wafwerar's avatar
wafwerar 已提交
215
    SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
dengyihao's avatar
dengyihao 已提交
216 217
    item->pThrd = arg;
    QUEUE_INIT(&item->qmsg);
wafwerar's avatar
wafwerar 已提交
218
    taosThreadMutexInit(&item->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
219

dengyihao's avatar
dengyihao 已提交
220 221
    uv_async_t* async = &(pool->asyncs[i]);
    uv_async_init(loop, async, cb);
dengyihao's avatar
dengyihao 已提交
222
    async->data = item;
dengyihao's avatar
dengyihao 已提交
223 224 225
  }
  return pool;
}
dengyihao's avatar
dengyihao 已提交
226

dengyihao's avatar
dengyihao 已提交
227
void transAsyncPoolDestroy(SAsyncPool* pool) {
dengyihao's avatar
dengyihao 已提交
228 229
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
dengyihao's avatar
dengyihao 已提交
230

dengyihao's avatar
dengyihao 已提交
231
    SAsyncItem* item = async->data;
wafwerar's avatar
wafwerar 已提交
232
    taosThreadMutexDestroy(&item->mtx);
wafwerar's avatar
wafwerar 已提交
233
    taosMemoryFree(item);
dengyihao's avatar
dengyihao 已提交
234
  }
wafwerar's avatar
wafwerar 已提交
235 236
  taosMemoryFree(pool->asyncs);
  taosMemoryFree(pool);
dengyihao's avatar
dengyihao 已提交
237
}
dengyihao's avatar
dengyihao 已提交
238 239 240 241 242 243 244 245
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;
    if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
246
int transAsyncSend(SAsyncPool* pool, queue* q) {
dengyihao's avatar
dengyihao 已提交
247 248 249
  if (atomic_load_8(&pool->stop) == 1) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
250 251
  int idx = pool->index % pool->nAsync;

dengyihao's avatar
dengyihao 已提交
252
  // no need mutex here
dengyihao's avatar
dengyihao 已提交
253
  if (pool->index++ > pool->nAsync * 2000) {
dengyihao's avatar
dengyihao 已提交
254 255
    pool->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
256 257 258
  uv_async_t* async = &(pool->asyncs[idx]);
  SAsyncItem* item = async->data;

wafwerar's avatar
wafwerar 已提交
259
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
260
  QUEUE_PUSH(&item->qmsg, q);
wafwerar's avatar
wafwerar 已提交
261
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
262
  return uv_async_send(async);
dengyihao's avatar
dengyihao 已提交
263
}
dengyihao's avatar
dengyihao 已提交
264

dengyihao's avatar
dengyihao 已提交
265 266 267 268
void transCtxInit(STransCtx* ctx) {
  // init transCtx
  ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
}
dengyihao's avatar
dengyihao 已提交
269
void transCtxCleanup(STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
270 271 272 273 274 275
  if (ctx->args == NULL) {
    return;
  }

  STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
  while (iter) {
dengyihao's avatar
dengyihao 已提交
276
    ctx->freeFunc(iter->val);
dengyihao's avatar
dengyihao 已提交
277 278
    iter = taosHashIterate(ctx->args, iter);
  }
dengyihao's avatar
dengyihao 已提交
279
  ctx->freeFunc(ctx->brokenVal.val);
dengyihao's avatar
dengyihao 已提交
280
  taosHashCleanup(ctx->args);
U
ubuntu 已提交
281
  ctx->args = NULL;
dengyihao's avatar
dengyihao 已提交
282 283 284
}

void transCtxMerge(STransCtx* dst, STransCtx* src) {
dengyihao's avatar
dengyihao 已提交
285 286 287
  if (src->args == NULL || src->freeFunc == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
288 289
  if (dst->args == NULL) {
    dst->args = src->args;
dengyihao's avatar
dengyihao 已提交
290
    dst->brokenVal = src->brokenVal;
dengyihao's avatar
dengyihao 已提交
291
    dst->freeFunc = src->freeFunc;
dengyihao's avatar
dengyihao 已提交
292 293 294 295 296 297 298 299 300 301
    src->args = NULL;
    return;
  }
  void*  key = NULL;
  size_t klen = 0;
  void*  iter = taosHashIterate(src->args, NULL);
  while (iter) {
    STransCtxVal* sVal = (STransCtxVal*)iter;
    key = taosHashGetKey(sVal, &klen);

dengyihao's avatar
dengyihao 已提交
302 303 304 305
    // STransCtxVal* dVal = taosHashGet(dst->args, key, klen);
    // if (dVal) {
    //   dst->freeFunc(dVal->val);
    // }
dengyihao's avatar
dengyihao 已提交
306 307 308 309 310 311 312 313 314 315 316 317 318
    taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
    iter = taosHashIterate(src->args, iter);
  }
  taosHashCleanup(src->args);
}
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
  if (ctx->args == NULL) {
    return NULL;
  }
  STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
  if (cVal == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
319
  void* ret = NULL;
D
dapan1121 已提交
320 321
  (*cVal->clone)(cVal->val, &ret);
  return ret;
dengyihao's avatar
dengyihao 已提交
322
}
dengyihao's avatar
dengyihao 已提交
323
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
dengyihao's avatar
dengyihao 已提交
324
  void* ret = NULL;
dengyihao's avatar
dengyihao 已提交
325
  if (ctx->brokenVal.clone == NULL) {
dengyihao's avatar
dengyihao 已提交
326
    return ret;
dengyihao's avatar
dengyihao 已提交
327
  }
D
dapan1121 已提交
328
  (*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret);
dengyihao's avatar
dengyihao 已提交
329 330 331

  *msgType = ctx->brokenVal.msgType;

D
dapan1121 已提交
332
  return ret;
dengyihao's avatar
dengyihao 已提交
333
}
dengyihao's avatar
dengyihao 已提交
334

dengyihao's avatar
dengyihao 已提交
335
void transReqQueueInit(queue* q) {
dengyihao's avatar
dengyihao 已提交
336
  // init req queue
dengyihao's avatar
dengyihao 已提交
337 338
  QUEUE_INIT(q);
}
dengyihao's avatar
dengyihao 已提交
339
void* transReqQueuePush(queue* q) {
dengyihao's avatar
dengyihao 已提交
340 341 342 343
  STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
  req->wreq.data = req;
  QUEUE_PUSH(q, &req->q);
  return &req->wreq;
dengyihao's avatar
dengyihao 已提交
344 345 346
}
void* transReqQueueRemove(void* arg) {
  void*       ret = NULL;
dengyihao's avatar
dengyihao 已提交
347
  uv_write_t* wreq = arg;
dengyihao's avatar
dengyihao 已提交
348

dengyihao's avatar
dengyihao 已提交
349 350 351
  STransReq* req = wreq ? wreq->data : NULL;
  if (req == NULL) return NULL;
  QUEUE_REMOVE(&req->q);
dengyihao's avatar
dengyihao 已提交
352

dengyihao's avatar
dengyihao 已提交
353 354
  ret = wreq && wreq->handle ? wreq->handle->data : NULL;
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
355 356 357 358 359 360 361 362 363 364 365 366

  return ret;
}
void transReqQueueClear(queue* q) {
  while (!QUEUE_IS_EMPTY(q)) {
    queue* h = QUEUE_HEAD(q);
    QUEUE_REMOVE(h);
    STransReq* req = QUEUE_DATA(h, STransReq, q);
    taosMemoryFree(req);
  }
}

wafwerar's avatar
wafwerar 已提交
367
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
dengyihao's avatar
dengyihao 已提交
368
  queue->q = taosArrayInit(2, sizeof(void*));
wafwerar's avatar
wafwerar 已提交
369
  queue->freeFunc = (void (*)(const void*))freeFunc;
dengyihao's avatar
dengyihao 已提交
370 371
}
bool transQueuePush(STransQueue* queue, void* arg) {
dengyihao's avatar
dengyihao 已提交
372 373 374
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
375 376 377 378 379 380 381
  taosArrayPush(queue->q, &arg);
  if (taosArrayGetSize(queue->q) > 1) {
    return false;
  }
  return true;
}
void* transQueuePop(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
382
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
383 384 385 386 387 388
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, 0);
  taosArrayRemove(queue->q, 0);
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
389
int32_t transQueueSize(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
390 391 392
  if (queue->q == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
393 394 395
  return taosArrayGetSize(queue->q);
}
void* transQueueGet(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
396
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
397 398 399 400 401
    return NULL;
  }
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
402

dengyihao's avatar
dengyihao 已提交
403 404 405 406 407
  void* ptr = taosArrayGetP(queue->q, i);
  return ptr;
}

void* transQueueRm(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
408
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
409 410
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
411 412 413 414 415
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, i);
  taosArrayRemove(queue->q, i);
dengyihao's avatar
dengyihao 已提交
416 417
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
418

dengyihao's avatar
dengyihao 已提交
419
bool transQueueEmpty(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
420 421 422
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
423 424 425
  return taosArrayGetSize(queue->q) == 0;
}
void transQueueClear(STransQueue* queue) {
wafwerar's avatar
wafwerar 已提交
426
  if (queue->freeFunc != NULL) {
dengyihao's avatar
dengyihao 已提交
427 428
    for (int i = 0; i < taosArrayGetSize(queue->q); i++) {
      void* p = taosArrayGetP(queue->q, i);
wafwerar's avatar
wafwerar 已提交
429
      queue->freeFunc(p);
dengyihao's avatar
dengyihao 已提交
430 431 432 433 434 435 436 437
    }
  }
  taosArrayClear(queue->q);
}
void transQueueDestroy(STransQueue* queue) {
  transQueueClear(queue);
  taosArrayDestroy(queue->q);
}
dengyihao's avatar
dengyihao 已提交
438

dengyihao's avatar
dengyihao 已提交
439
static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
dengyihao's avatar
dengyihao 已提交
440 441 442
  SDelayTask* arg1 = container_of(a, SDelayTask, node);
  SDelayTask* arg2 = container_of(b, SDelayTask, node);
  if (arg1->execTime > arg2->execTime) {
dengyihao's avatar
dengyihao 已提交
443
    return 0;
dengyihao's avatar
dengyihao 已提交
444 445 446 447 448
  } else {
    return 1;
  }
}

dengyihao's avatar
dengyihao 已提交
449
static void transDQTimeout(uv_timer_t* timer) {
dengyihao's avatar
dengyihao 已提交
450
  SDelayQueue* queue = timer->data;
dengyihao's avatar
dengyihao 已提交
451 452
  tTrace("timer %p timeout", timer);
  uint64_t timeout = 0;
dengyihao's avatar
dengyihao 已提交
453
  int64_t  current = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
454 455 456 457
  do {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) break;
    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
458
    if (task->execTime <= current) {
dengyihao's avatar
dengyihao 已提交
459 460 461 462 463
      heapRemove(queue->heap, minNode);
      task->func(task->arg);
      taosMemoryFree(task);
      timeout = 0;
    } else {
dengyihao's avatar
dengyihao 已提交
464
      timeout = task->execTime - current;
dengyihao's avatar
dengyihao 已提交
465 466 467 468
      break;
    }
  } while (1);
  if (timeout != 0) {
dengyihao's avatar
dengyihao 已提交
469
    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
dengyihao's avatar
dengyihao 已提交
470 471
  }
}
dengyihao's avatar
dengyihao 已提交
472
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
dengyihao's avatar
dengyihao 已提交
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
  uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
  uv_timer_init(loop, timer);

  Heap* heap = heapCreate(timeCompare);

  SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue));
  q->heap = heap;
  q->timer = timer;
  q->loop = loop;
  q->timer->data = q;

  *queue = q;
  return 0;
}

dengyihao's avatar
dengyihao 已提交
488
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
dengyihao's avatar
dengyihao 已提交
489
  taosMemoryFree(queue->timer);
dengyihao's avatar
dengyihao 已提交
490 491 492 493 494 495 496 497 498

  while (heapSize(queue->heap) > 0) {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) {
      return;
    }
    heapRemove(queue->heap, minNode);

    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
499 500

    STaskArg* arg = task->arg;
dengyihao's avatar
dengyihao 已提交
501
    if (freeFunc) freeFunc(arg);
dengyihao's avatar
dengyihao 已提交
502 503
    taosMemoryFree(arg);

dengyihao's avatar
dengyihao 已提交
504 505
    taosMemoryFree(task);
  }
dengyihao's avatar
dengyihao 已提交
506 507 508
  heapDestroy(queue->heap);
  taosMemoryFree(queue);
}
dengyihao's avatar
dengyihao 已提交
509 510
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
  uv_timer_stop(queue->timer);
dengyihao's avatar
dengyihao 已提交
511

dengyihao's avatar
dengyihao 已提交
512 513 514 515 516
  if (heapSize(queue->heap) <= 0) {
    taosMemoryFree(task->arg);
    taosMemoryFree(task);
    return;
  }
dengyihao's avatar
dengyihao 已提交
517 518
  heapRemove(queue->heap, &task->node);

dengyihao's avatar
dengyihao 已提交
519 520 521
  taosMemoryFree(task->arg);
  taosMemoryFree(task);

dengyihao's avatar
dengyihao 已提交
522 523
  if (heapSize(queue->heap) != 0) {
    HeapNode* minNode = heapMin(queue->heap);
dengyihao's avatar
dengyihao 已提交
524
    if (minNode == NULL) return;
dengyihao's avatar
dengyihao 已提交
525 526 527 528 529 530 531 532 533 534

    uint64_t    now = taosGetTimestampMs();
    SDelayTask* task = container_of(minNode, SDelayTask, node);
    uint64_t    timeout = now > task->execTime ? now - task->execTime : 0;

    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
  }
}

SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
dengyihao's avatar
dengyihao 已提交
535
  uint64_t    now = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
536 537 538
  SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
  task->func = func;
  task->arg = arg;
dengyihao's avatar
dengyihao 已提交
539 540 541 542 543 544
  task->execTime = now + timeoutMs;

  HeapNode* minNode = heapMin(queue->heap);
  if (minNode) {
    SDelayTask* minTask = container_of(minNode, SDelayTask, node);
    if (minTask->execTime < task->execTime) {
dengyihao's avatar
dengyihao 已提交
545
      timeoutMs = minTask->execTime <= now ? 0 : minTask->execTime - now;
dengyihao's avatar
dengyihao 已提交
546 547
    }
  }
dengyihao's avatar
dengyihao 已提交
548

S
Shengliang Guan 已提交
549
  tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs);
dengyihao's avatar
dengyihao 已提交
550
  heapInsert(queue->heap, &task->node);
dengyihao's avatar
dengyihao 已提交
551
  uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
dengyihao's avatar
dengyihao 已提交
552
  return task;
dengyihao's avatar
dengyihao 已提交
553
}
dengyihao's avatar
dengyihao 已提交
554 555 556 557 558 559

void transPrintEpSet(SEpSet* pEpSet) {
  if (pEpSet == NULL) {
    tTrace("NULL epset");
    return;
  }
dengyihao's avatar
dengyihao 已提交
560
  char buf[512] = {0};
dengyihao's avatar
dengyihao 已提交
561
  int  len = snprintf(buf, sizeof(buf), "epset:{");
dengyihao's avatar
dengyihao 已提交
562
  for (int i = 0; i < pEpSet->numOfEps; i++) {
dengyihao's avatar
dengyihao 已提交
563
    if (i == pEpSet->numOfEps - 1) {
dengyihao's avatar
dengyihao 已提交
564
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
dengyihao's avatar
dengyihao 已提交
565 566 567
    } else {
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
    }
dengyihao's avatar
dengyihao 已提交
568
  }
dengyihao's avatar
dengyihao 已提交
569
  len += snprintf(buf + len, sizeof(buf) - len, "}");
dengyihao's avatar
dengyihao 已提交
570
  tTrace("%s, inUse:%d", buf, pEpSet->inUse);
dengyihao's avatar
dengyihao 已提交
571
}
572 573 574 575 576 577 578 579 580 581 582
bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
  if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
    return false;
  }
  for (int i = 0; i < a->numOfEps; i++) {
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
      return false;
    }
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
583

dengyihao's avatar
dengyihao 已提交
584
static void transInitEnv() {
dengyihao's avatar
dengyihao 已提交
585 586
  refMgt = transOpenRefMgt(50000, transDestoryExHandle);
  instMgt = taosOpenRef(50, rpcCloseImpl);
dengyihao's avatar
dengyihao 已提交
587 588
  uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
}
dengyihao's avatar
dengyihao 已提交
589
static void transDestroyEnv() {
dengyihao's avatar
dengyihao 已提交
590 591
  transCloseRefMgt(refMgt);
  transCloseRefMgt(instMgt);
dengyihao's avatar
dengyihao 已提交
592
}
dengyihao's avatar
dengyihao 已提交
593

dengyihao's avatar
dengyihao 已提交
594 595 596 597
void transInit() {
  // init env
  taosThreadOnce(&transModuleInit, transInitEnv);
}
dengyihao's avatar
dengyihao 已提交
598 599 600 601

int32_t transGetRefMgt() { return refMgt; }
int32_t transGetInstMgt() { return instMgt; }

dengyihao's avatar
dengyihao 已提交
602 603 604 605
void transCleanup() {
  // clean env
  transDestroyEnv();
}
dengyihao's avatar
dengyihao 已提交
606
int32_t transOpenRefMgt(int size, void (*func)(void*)) {
dengyihao's avatar
dengyihao 已提交
607
  // added into once later
dengyihao's avatar
dengyihao 已提交
608
  return taosOpenRef(size, func);
dengyihao's avatar
dengyihao 已提交
609
}
dengyihao's avatar
dengyihao 已提交
610
void transCloseRefMgt(int32_t mgt) {
dengyihao's avatar
dengyihao 已提交
611
  // close ref
dengyihao's avatar
dengyihao 已提交
612
  taosCloseRef(mgt);
dengyihao's avatar
dengyihao 已提交
613
}
dengyihao's avatar
dengyihao 已提交
614
int64_t transAddExHandle(int32_t refMgt, void* p) {
dengyihao's avatar
dengyihao 已提交
615
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
616
  return taosAddRef(refMgt, p);
dengyihao's avatar
dengyihao 已提交
617
}
dengyihao's avatar
dengyihao 已提交
618
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
619
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
620
  return taosRemoveRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
621 622
}

dengyihao's avatar
dengyihao 已提交
623
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
624
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
625
  return (void*)taosAcquireRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
626 627
}

dengyihao's avatar
dengyihao 已提交
628
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
629
  // release extern handle
dengyihao's avatar
dengyihao 已提交
630
  return taosReleaseRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
631 632 633 634 635 636 637
}
void transDestoryExHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
638
#endif