transComm.c 17.2 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifdef USE_UV

#include "transComm.h"

dengyihao's avatar
opt rpc  
dengyihao 已提交
19 20
#define BUFFER_CAP 4096

dengyihao's avatar
dengyihao 已提交
21 22 23
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;

static int32_t refMgt;
dengyihao's avatar
dengyihao 已提交
24
static int32_t instMgt;
dengyihao's avatar
dengyihao 已提交
25

dengyihao's avatar
dengyihao 已提交
26 27 28 29
int32_t transCompressMsg(char* msg, int32_t len) {
  int32_t        ret = 0;
  int            compHdr = sizeof(STransCompMsg);
  STransMsgHead* pHead = transHeadFromCont(msg);
dengyihao's avatar
dengyihao 已提交
30

dengyihao's avatar
dengyihao 已提交
31
  char* buf = taosMemoryMalloc(len + compHdr + 8);  // 8 extra bytes
dengyihao's avatar
dengyihao 已提交
32 33
  if (buf == NULL) {
    tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
dengyihao's avatar
dengyihao 已提交
34 35
    ret = len;
    return ret;
dengyihao's avatar
dengyihao 已提交
36 37
  }

dengyihao's avatar
dengyihao 已提交
38
  int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
dengyihao's avatar
dengyihao 已提交
39 40 41 42
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
dengyihao's avatar
dengyihao 已提交
43
  if (clen > 0 && clen < len - compHdr) {
dengyihao's avatar
dengyihao 已提交
44 45 46
    STransCompMsg* pComp = (STransCompMsg*)msg;
    pComp->reserved = 0;
    pComp->contLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
47
    memcpy(msg + compHdr, buf, clen);
dengyihao's avatar
dengyihao 已提交
48 49

    tDebug("compress rpc msg, before:%d, after:%d", len, clen);
dengyihao's avatar
dengyihao 已提交
50 51
    ret = clen + compHdr;
    pHead->comp = 1;
dengyihao's avatar
dengyihao 已提交
52
  } else {
dengyihao's avatar
dengyihao 已提交
53 54
    ret = len;
    pHead->comp = 0;
dengyihao's avatar
dengyihao 已提交
55
  }
wafwerar's avatar
wafwerar 已提交
56
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
57
  return ret;
dengyihao's avatar
dengyihao 已提交
58
}
dengyihao's avatar
dengyihao 已提交
59 60 61 62
int32_t transDecompressMsg(char** msg, int32_t len) {
  STransMsgHead* pHead = (STransMsgHead*)(*msg);
  if (pHead->comp == 0) return 0;

dengyihao's avatar
dengyihao 已提交
63 64
  char* pCont = transContFromHead(pHead);

dengyihao's avatar
dengyihao 已提交
65 66 67 68 69
  STransCompMsg* pComp = (STransCompMsg*)pCont;
  int32_t        oriLen = htonl(pComp->contLen);

  char*          buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
  STransMsgHead* pNewHead = (STransMsgHead*)buf;
dengyihao's avatar
dengyihao 已提交
70 71
  int32_t        decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), pNewHead->content,
                                                 len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
dengyihao's avatar
dengyihao 已提交
72
  memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
dengyihao's avatar
dengyihao 已提交
73

dengyihao's avatar
dengyihao 已提交
74 75 76 77 78 79 80 81
  pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead));

  taosMemoryFree(pHead);
  *msg = buf;
  if (decompLen != oriLen) {
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
82 83
}

dengyihao's avatar
dengyihao 已提交
84 85 86 87
void transFreeMsg(void* msg) {
  if (msg == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
88
  taosMemoryFree((char*)msg - sizeof(STransMsgHead));
dengyihao's avatar
dengyihao 已提交
89
}
dengyihao's avatar
dengyihao 已提交
90
int transSockInfo2Str(struct sockaddr* sockname, char* dst) {
dengyihao's avatar
dengyihao 已提交
91
  struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
dengyihao's avatar
dengyihao 已提交
92

dengyihao's avatar
dengyihao 已提交
93 94 95 96 97
  char buf[20] = {0};
  int  r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
  sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
  return r;
}
dengyihao's avatar
dengyihao 已提交
98
int transInitBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
99 100 101 102 103
  buf->cap = BUFFER_CAP;
  buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
  buf->left = -1;
  buf->len = 0;
  buf->total = 0;
dengyihao's avatar
dengyihao 已提交
104
  buf->invalid = 0;
dengyihao's avatar
opt rpc  
dengyihao 已提交
105 106
  return 0;
}
dengyihao's avatar
dengyihao 已提交
107 108 109
int transDestroyBuffer(SConnBuffer* p) {
  taosMemoryFree(p->buf);
  p->buf = NULL;
dengyihao's avatar
dengyihao 已提交
110 111
  return 0;
}
dengyihao's avatar
opt rpc  
dengyihao 已提交
112

dengyihao's avatar
dengyihao 已提交
113
int transClearBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
114 115 116 117 118 119 120 121
  SConnBuffer* p = buf;
  if (p->cap > BUFFER_CAP) {
    p->cap = BUFFER_CAP;
    p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
  }
  p->left = -1;
  p->len = 0;
  p->total = 0;
dengyihao's avatar
dengyihao 已提交
122
  p->invalid = 0;
dengyihao's avatar
opt rpc  
dengyihao 已提交
123 124 125 126
  return 0;
}

int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
dengyihao's avatar
dengyihao 已提交
127 128
  static const int HEADSIZE = sizeof(STransMsgHead);

dengyihao's avatar
opt rpc  
dengyihao 已提交
129
  SConnBuffer* p = connBuf;
dengyihao's avatar
dengyihao 已提交
130
  if (p->left != 0 || p->total <= 0) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
131 132
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
133
  int total = p->total;
dengyihao's avatar
dengyihao 已提交
134
  if (total >= HEADSIZE && !p->invalid) {
dengyihao's avatar
dengyihao 已提交
135 136
    *buf = taosMemoryCalloc(1, total);
    memcpy(*buf, p->buf, total);
dengyihao's avatar
dengyihao 已提交
137 138 139
    if (transResetBuffer(connBuf) < 0) {
      return -1;
    }
dengyihao's avatar
dengyihao 已提交
140 141 142
  } else {
    total = -1;
  }
dengyihao's avatar
opt rpc  
dengyihao 已提交
143 144 145 146 147
  return total;
}

int transResetBuffer(SConnBuffer* connBuf) {
  SConnBuffer* p = connBuf;
dengyihao's avatar
dengyihao 已提交
148
  if (p->total < p->len) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
149 150 151 152 153
    int left = p->len - p->total;
    memmove(p->buf, p->buf + p->total, left);
    p->left = -1;
    p->total = 0;
    p->len = left;
dengyihao's avatar
dengyihao 已提交
154
  } else if (p->total == p->len) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
155 156 157
    p->left = -1;
    p->total = 0;
    p->len = 0;
dengyihao's avatar
dengyihao 已提交
158
  } else {
dengyihao's avatar
dengyihao 已提交
159 160
    ASSERTS(0, "invalid read from sock buf");
    return -1;
dengyihao's avatar
opt rpc  
dengyihao 已提交
161
  }
dengyihao's avatar
dengyihao 已提交
162 163 164 165 166 167
  return 0;
}
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
  /*
   * formate of data buffer:
   * |<--------------------------data from socket------------------------------->|
dengyihao's avatar
dengyihao 已提交
168 169
   * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
   * info--->|
dengyihao's avatar
dengyihao 已提交
170 171
   */
  SConnBuffer* p = connBuf;
dengyihao's avatar
opt rpc  
dengyihao 已提交
172 173
  uvBuf->base = p->buf + p->len;
  if (p->left == -1) {
dengyihao's avatar
dengyihao 已提交
174
    uvBuf->len = p->cap - p->len;
dengyihao's avatar
opt rpc  
dengyihao 已提交
175 176 177 178
  } else {
    if (p->left < p->cap - p->len) {
      uvBuf->len = p->left;
    } else {
dengyihao's avatar
dengyihao 已提交
179 180
      p->cap = p->left + p->len;
      p->buf = taosMemoryRealloc(p->buf, p->cap);
dengyihao's avatar
opt rpc  
dengyihao 已提交
181 182 183
      uvBuf->base = p->buf + p->len;
      uvBuf->len = p->left;
    }
dengyihao's avatar
dengyihao 已提交
184 185 186
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
187 188
// check whether already read complete
bool transReadComplete(SConnBuffer* connBuf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
189 190 191 192 193 194 195
  SConnBuffer* p = connBuf;
  if (p->len >= sizeof(STransMsgHead)) {
    if (p->left == -1) {
      STransMsgHead head;
      memcpy((char*)&head, connBuf->buf, sizeof(head));
      int32_t msgLen = (int32_t)htonl(head.msgLen);
      p->total = msgLen;
dengyihao's avatar
dengyihao 已提交
196
      p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum));
dengyihao's avatar
opt rpc  
dengyihao 已提交
197 198 199 200 201 202
    }
    if (p->total >= p->len) {
      p->left = p->total - p->len;
    } else {
      p->left = 0;
    }
dengyihao's avatar
dengyihao 已提交
203
  }
dengyihao's avatar
dengyihao 已提交
204
  return (p->left == 0 || p->invalid) ? true : false;
dengyihao's avatar
dengyihao 已提交
205
}
dengyihao's avatar
dengyihao 已提交
206

dengyihao's avatar
dengyihao 已提交
207
int transSetConnOption(uv_tcp_t* stream) {
208 209 210 211
#if defined(WINDOWS) || defined(DARWIN)
#else
  uv_tcp_keepalive(stream, 1, 20);
#endif
dengyihao's avatar
dengyihao 已提交
212 213
  return uv_tcp_nodelay(stream, 1);
  // int ret = uv_tcp_keepalive(stream, 5, 60);
dengyihao's avatar
dengyihao 已提交
214 215
}

dengyihao's avatar
dengyihao 已提交
216
SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
wafwerar's avatar
wafwerar 已提交
217
  SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
dengyihao's avatar
dengyihao 已提交
218
  pool->nAsync = sz;
wafwerar's avatar
wafwerar 已提交
219
  pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
dengyihao's avatar
dengyihao 已提交
220

dengyihao's avatar
dengyihao 已提交
221 222 223 224
  int i = 0, err = 0;
  for (i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);

wafwerar's avatar
wafwerar 已提交
225
    SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
dengyihao's avatar
dengyihao 已提交
226 227
    item->pThrd = arg;
    QUEUE_INIT(&item->qmsg);
wafwerar's avatar
wafwerar 已提交
228
    taosThreadMutexInit(&item->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
229 230

    async->data = item;
dengyihao's avatar
dengyihao 已提交
231 232 233 234 235
    err = uv_async_init(loop, async, cb);
    if (err != 0) {
      tError("failed to init async, reason:%s", uv_err_name(err));
      break;
    }
dengyihao's avatar
dengyihao 已提交
236
  }
dengyihao's avatar
dengyihao 已提交
237 238 239 240 241 242

  if (i != pool->nAsync) {
    transAsyncPoolDestroy(pool);
    pool = NULL;
  }

dengyihao's avatar
dengyihao 已提交
243 244
  return pool;
}
dengyihao's avatar
dengyihao 已提交
245

dengyihao's avatar
dengyihao 已提交
246
void transAsyncPoolDestroy(SAsyncPool* pool) {
dengyihao's avatar
dengyihao 已提交
247 248
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
dengyihao's avatar
dengyihao 已提交
249
    SAsyncItem* item = async->data;
dengyihao's avatar
dengyihao 已提交
250 251
    if (item == NULL) continue;

wafwerar's avatar
wafwerar 已提交
252
    taosThreadMutexDestroy(&item->mtx);
wafwerar's avatar
wafwerar 已提交
253
    taosMemoryFree(item);
dengyihao's avatar
dengyihao 已提交
254
  }
wafwerar's avatar
wafwerar 已提交
255 256
  taosMemoryFree(pool->asyncs);
  taosMemoryFree(pool);
dengyihao's avatar
dengyihao 已提交
257
}
dengyihao's avatar
dengyihao 已提交
258 259 260 261 262 263 264 265
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;
    if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
266
int transAsyncSend(SAsyncPool* pool, queue* q) {
dengyihao's avatar
dengyihao 已提交
267 268 269
  if (atomic_load_8(&pool->stop) == 1) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
270 271
  int idx = pool->index % pool->nAsync;

dengyihao's avatar
dengyihao 已提交
272
  // no need mutex here
dengyihao's avatar
dengyihao 已提交
273
  if (pool->index++ > pool->nAsync * 2000) {
dengyihao's avatar
dengyihao 已提交
274 275
    pool->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
276 277 278
  uv_async_t* async = &(pool->asyncs[idx]);
  SAsyncItem* item = async->data;

wafwerar's avatar
wafwerar 已提交
279
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
280
  QUEUE_PUSH(&item->qmsg, q);
wafwerar's avatar
wafwerar 已提交
281
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
282
  return uv_async_send(async);
dengyihao's avatar
dengyihao 已提交
283
}
dengyihao's avatar
dengyihao 已提交
284

dengyihao's avatar
dengyihao 已提交
285 286 287 288
void transCtxInit(STransCtx* ctx) {
  // init transCtx
  ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
}
dengyihao's avatar
dengyihao 已提交
289
void transCtxCleanup(STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
290 291 292 293 294 295
  if (ctx->args == NULL) {
    return;
  }

  STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
  while (iter) {
dengyihao's avatar
dengyihao 已提交
296
    ctx->freeFunc(iter->val);
dengyihao's avatar
dengyihao 已提交
297 298
    iter = taosHashIterate(ctx->args, iter);
  }
dengyihao's avatar
dengyihao 已提交
299
  ctx->freeFunc(ctx->brokenVal.val);
dengyihao's avatar
dengyihao 已提交
300
  taosHashCleanup(ctx->args);
U
ubuntu 已提交
301
  ctx->args = NULL;
dengyihao's avatar
dengyihao 已提交
302 303 304
}

void transCtxMerge(STransCtx* dst, STransCtx* src) {
dengyihao's avatar
dengyihao 已提交
305 306 307
  if (src->args == NULL || src->freeFunc == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
308 309
  if (dst->args == NULL) {
    dst->args = src->args;
dengyihao's avatar
dengyihao 已提交
310
    dst->brokenVal = src->brokenVal;
dengyihao's avatar
dengyihao 已提交
311
    dst->freeFunc = src->freeFunc;
dengyihao's avatar
dengyihao 已提交
312 313 314 315 316 317 318 319 320 321
    src->args = NULL;
    return;
  }
  void*  key = NULL;
  size_t klen = 0;
  void*  iter = taosHashIterate(src->args, NULL);
  while (iter) {
    STransCtxVal* sVal = (STransCtxVal*)iter;
    key = taosHashGetKey(sVal, &klen);

dengyihao's avatar
dengyihao 已提交
322 323 324 325
    // STransCtxVal* dVal = taosHashGet(dst->args, key, klen);
    // if (dVal) {
    //   dst->freeFunc(dVal->val);
    // }
dengyihao's avatar
dengyihao 已提交
326 327 328 329 330 331 332 333 334 335 336 337 338
    taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
    iter = taosHashIterate(src->args, iter);
  }
  taosHashCleanup(src->args);
}
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
  if (ctx->args == NULL) {
    return NULL;
  }
  STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
  if (cVal == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
339
  void* ret = NULL;
D
dapan1121 已提交
340 341
  (*cVal->clone)(cVal->val, &ret);
  return ret;
dengyihao's avatar
dengyihao 已提交
342
}
dengyihao's avatar
dengyihao 已提交
343
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
dengyihao's avatar
dengyihao 已提交
344
  void* ret = NULL;
dengyihao's avatar
dengyihao 已提交
345
  if (ctx->brokenVal.clone == NULL) {
dengyihao's avatar
dengyihao 已提交
346
    return ret;
dengyihao's avatar
dengyihao 已提交
347
  }
D
dapan1121 已提交
348
  (*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret);
dengyihao's avatar
dengyihao 已提交
349 350 351

  *msgType = ctx->brokenVal.msgType;

D
dapan1121 已提交
352
  return ret;
dengyihao's avatar
dengyihao 已提交
353
}
dengyihao's avatar
dengyihao 已提交
354

dengyihao's avatar
dengyihao 已提交
355
void transReqQueueInit(queue* q) {
dengyihao's avatar
dengyihao 已提交
356
  // init req queue
dengyihao's avatar
dengyihao 已提交
357 358
  QUEUE_INIT(q);
}
dengyihao's avatar
dengyihao 已提交
359
void* transReqQueuePush(queue* q) {
dengyihao's avatar
dengyihao 已提交
360 361 362 363
  STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
  req->wreq.data = req;
  QUEUE_PUSH(q, &req->q);
  return &req->wreq;
dengyihao's avatar
dengyihao 已提交
364 365 366
}
void* transReqQueueRemove(void* arg) {
  void*       ret = NULL;
dengyihao's avatar
dengyihao 已提交
367
  uv_write_t* wreq = arg;
dengyihao's avatar
dengyihao 已提交
368

dengyihao's avatar
dengyihao 已提交
369 370 371
  STransReq* req = wreq ? wreq->data : NULL;
  if (req == NULL) return NULL;
  QUEUE_REMOVE(&req->q);
dengyihao's avatar
dengyihao 已提交
372

dengyihao's avatar
dengyihao 已提交
373 374
  ret = wreq && wreq->handle ? wreq->handle->data : NULL;
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
375 376 377 378 379 380 381 382 383 384 385 386

  return ret;
}
void transReqQueueClear(queue* q) {
  while (!QUEUE_IS_EMPTY(q)) {
    queue* h = QUEUE_HEAD(q);
    QUEUE_REMOVE(h);
    STransReq* req = QUEUE_DATA(h, STransReq, q);
    taosMemoryFree(req);
  }
}

wafwerar's avatar
wafwerar 已提交
387
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
dengyihao's avatar
dengyihao 已提交
388
  queue->q = taosArrayInit(2, sizeof(void*));
wafwerar's avatar
wafwerar 已提交
389
  queue->freeFunc = (void (*)(const void*))freeFunc;
dengyihao's avatar
dengyihao 已提交
390 391
}
bool transQueuePush(STransQueue* queue, void* arg) {
dengyihao's avatar
dengyihao 已提交
392 393 394
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
395 396 397 398 399 400 401
  taosArrayPush(queue->q, &arg);
  if (taosArrayGetSize(queue->q) > 1) {
    return false;
  }
  return true;
}
void* transQueuePop(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
402
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
403 404 405 406 407 408
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, 0);
  taosArrayRemove(queue->q, 0);
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
409
int32_t transQueueSize(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
410 411 412
  if (queue->q == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
413 414 415
  return taosArrayGetSize(queue->q);
}
void* transQueueGet(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
416
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
417 418 419 420 421
    return NULL;
  }
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
422

dengyihao's avatar
dengyihao 已提交
423 424 425 426 427
  void* ptr = taosArrayGetP(queue->q, i);
  return ptr;
}

void* transQueueRm(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
428
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
429 430
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
431 432 433 434 435
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, i);
  taosArrayRemove(queue->q, i);
dengyihao's avatar
dengyihao 已提交
436 437
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
438

dengyihao's avatar
dengyihao 已提交
439
bool transQueueEmpty(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
440 441 442
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
443 444 445
  return taosArrayGetSize(queue->q) == 0;
}
void transQueueClear(STransQueue* queue) {
wafwerar's avatar
wafwerar 已提交
446
  if (queue->freeFunc != NULL) {
dengyihao's avatar
dengyihao 已提交
447 448
    for (int i = 0; i < taosArrayGetSize(queue->q); i++) {
      void* p = taosArrayGetP(queue->q, i);
wafwerar's avatar
wafwerar 已提交
449
      queue->freeFunc(p);
dengyihao's avatar
dengyihao 已提交
450 451 452 453 454 455 456 457
    }
  }
  taosArrayClear(queue->q);
}
void transQueueDestroy(STransQueue* queue) {
  transQueueClear(queue);
  taosArrayDestroy(queue->q);
}
dengyihao's avatar
dengyihao 已提交
458

dengyihao's avatar
dengyihao 已提交
459
static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
dengyihao's avatar
dengyihao 已提交
460 461 462
  SDelayTask* arg1 = container_of(a, SDelayTask, node);
  SDelayTask* arg2 = container_of(b, SDelayTask, node);
  if (arg1->execTime > arg2->execTime) {
dengyihao's avatar
dengyihao 已提交
463
    return 0;
dengyihao's avatar
dengyihao 已提交
464 465 466 467 468
  } else {
    return 1;
  }
}

dengyihao's avatar
dengyihao 已提交
469
static void transDQTimeout(uv_timer_t* timer) {
dengyihao's avatar
dengyihao 已提交
470
  SDelayQueue* queue = timer->data;
dengyihao's avatar
dengyihao 已提交
471 472
  tTrace("timer %p timeout", timer);
  uint64_t timeout = 0;
dengyihao's avatar
dengyihao 已提交
473
  int64_t  current = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
474 475 476 477
  do {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) break;
    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
478
    if (task->execTime <= current) {
dengyihao's avatar
dengyihao 已提交
479 480 481 482 483
      heapRemove(queue->heap, minNode);
      task->func(task->arg);
      taosMemoryFree(task);
      timeout = 0;
    } else {
dengyihao's avatar
dengyihao 已提交
484
      timeout = task->execTime - current;
dengyihao's avatar
dengyihao 已提交
485 486 487 488
      break;
    }
  } while (1);
  if (timeout != 0) {
dengyihao's avatar
dengyihao 已提交
489
    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
dengyihao's avatar
dengyihao 已提交
490 491
  }
}
dengyihao's avatar
dengyihao 已提交
492
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
dengyihao's avatar
dengyihao 已提交
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
  uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
  uv_timer_init(loop, timer);

  Heap* heap = heapCreate(timeCompare);

  SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue));
  q->heap = heap;
  q->timer = timer;
  q->loop = loop;
  q->timer->data = q;

  *queue = q;
  return 0;
}

dengyihao's avatar
dengyihao 已提交
508
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
dengyihao's avatar
dengyihao 已提交
509
  taosMemoryFree(queue->timer);
dengyihao's avatar
dengyihao 已提交
510 511 512 513 514 515 516 517 518

  while (heapSize(queue->heap) > 0) {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) {
      return;
    }
    heapRemove(queue->heap, minNode);

    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
519 520

    STaskArg* arg = task->arg;
dengyihao's avatar
dengyihao 已提交
521
    if (freeFunc) freeFunc(arg);
dengyihao's avatar
dengyihao 已提交
522 523
    taosMemoryFree(arg);

dengyihao's avatar
dengyihao 已提交
524 525
    taosMemoryFree(task);
  }
dengyihao's avatar
dengyihao 已提交
526 527 528
  heapDestroy(queue->heap);
  taosMemoryFree(queue);
}
dengyihao's avatar
dengyihao 已提交
529 530
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
  uv_timer_stop(queue->timer);
dengyihao's avatar
dengyihao 已提交
531

dengyihao's avatar
dengyihao 已提交
532 533 534 535 536
  if (heapSize(queue->heap) <= 0) {
    taosMemoryFree(task->arg);
    taosMemoryFree(task);
    return;
  }
dengyihao's avatar
dengyihao 已提交
537 538
  heapRemove(queue->heap, &task->node);

dengyihao's avatar
dengyihao 已提交
539 540 541
  taosMemoryFree(task->arg);
  taosMemoryFree(task);

dengyihao's avatar
dengyihao 已提交
542 543
  if (heapSize(queue->heap) != 0) {
    HeapNode* minNode = heapMin(queue->heap);
dengyihao's avatar
dengyihao 已提交
544
    if (minNode == NULL) return;
dengyihao's avatar
dengyihao 已提交
545 546 547 548 549 550 551 552 553 554

    uint64_t    now = taosGetTimestampMs();
    SDelayTask* task = container_of(minNode, SDelayTask, node);
    uint64_t    timeout = now > task->execTime ? now - task->execTime : 0;

    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
  }
}

SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
dengyihao's avatar
dengyihao 已提交
555
  uint64_t    now = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
556 557 558
  SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
  task->func = func;
  task->arg = arg;
dengyihao's avatar
dengyihao 已提交
559 560 561 562 563 564
  task->execTime = now + timeoutMs;

  HeapNode* minNode = heapMin(queue->heap);
  if (minNode) {
    SDelayTask* minTask = container_of(minNode, SDelayTask, node);
    if (minTask->execTime < task->execTime) {
dengyihao's avatar
dengyihao 已提交
565
      timeoutMs = minTask->execTime <= now ? 0 : minTask->execTime - now;
dengyihao's avatar
dengyihao 已提交
566 567
    }
  }
dengyihao's avatar
dengyihao 已提交
568

S
Shengliang Guan 已提交
569
  tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs);
dengyihao's avatar
dengyihao 已提交
570
  heapInsert(queue->heap, &task->node);
dengyihao's avatar
dengyihao 已提交
571
  uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
dengyihao's avatar
dengyihao 已提交
572
  return task;
dengyihao's avatar
dengyihao 已提交
573
}
dengyihao's avatar
dengyihao 已提交
574 575 576 577 578 579

void transPrintEpSet(SEpSet* pEpSet) {
  if (pEpSet == NULL) {
    tTrace("NULL epset");
    return;
  }
dengyihao's avatar
dengyihao 已提交
580
  char buf[512] = {0};
dengyihao's avatar
dengyihao 已提交
581
  int  len = snprintf(buf, sizeof(buf), "epset:{");
dengyihao's avatar
dengyihao 已提交
582
  for (int i = 0; i < pEpSet->numOfEps; i++) {
dengyihao's avatar
dengyihao 已提交
583
    if (i == pEpSet->numOfEps - 1) {
dengyihao's avatar
dengyihao 已提交
584
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
dengyihao's avatar
dengyihao 已提交
585 586 587
    } else {
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
    }
dengyihao's avatar
dengyihao 已提交
588
  }
dengyihao's avatar
dengyihao 已提交
589
  len += snprintf(buf + len, sizeof(buf) - len, "}");
dengyihao's avatar
dengyihao 已提交
590
  tTrace("%s, inUse:%d", buf, pEpSet->inUse);
dengyihao's avatar
dengyihao 已提交
591
}
592 593 594 595 596 597 598 599 600 601 602
bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
  if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
    return false;
  }
  for (int i = 0; i < a->numOfEps; i++) {
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
      return false;
    }
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
603

dengyihao's avatar
dengyihao 已提交
604
static void transInitEnv() {
dengyihao's avatar
dengyihao 已提交
605 606
  refMgt = transOpenRefMgt(50000, transDestoryExHandle);
  instMgt = taosOpenRef(50, rpcCloseImpl);
dengyihao's avatar
dengyihao 已提交
607 608
  uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
}
dengyihao's avatar
dengyihao 已提交
609
static void transDestroyEnv() {
dengyihao's avatar
dengyihao 已提交
610 611
  transCloseRefMgt(refMgt);
  transCloseRefMgt(instMgt);
dengyihao's avatar
dengyihao 已提交
612
}
dengyihao's avatar
dengyihao 已提交
613

dengyihao's avatar
dengyihao 已提交
614 615 616 617
void transInit() {
  // init env
  taosThreadOnce(&transModuleInit, transInitEnv);
}
dengyihao's avatar
dengyihao 已提交
618 619 620 621

int32_t transGetRefMgt() { return refMgt; }
int32_t transGetInstMgt() { return instMgt; }

dengyihao's avatar
dengyihao 已提交
622 623 624 625
void transCleanup() {
  // clean env
  transDestroyEnv();
}
dengyihao's avatar
dengyihao 已提交
626
int32_t transOpenRefMgt(int size, void (*func)(void*)) {
dengyihao's avatar
dengyihao 已提交
627
  // added into once later
dengyihao's avatar
dengyihao 已提交
628
  return taosOpenRef(size, func);
dengyihao's avatar
dengyihao 已提交
629
}
dengyihao's avatar
dengyihao 已提交
630
void transCloseRefMgt(int32_t mgt) {
dengyihao's avatar
dengyihao 已提交
631
  // close ref
dengyihao's avatar
dengyihao 已提交
632
  taosCloseRef(mgt);
dengyihao's avatar
dengyihao 已提交
633
}
dengyihao's avatar
dengyihao 已提交
634
int64_t transAddExHandle(int32_t refMgt, void* p) {
dengyihao's avatar
dengyihao 已提交
635
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
636
  return taosAddRef(refMgt, p);
dengyihao's avatar
dengyihao 已提交
637
}
dengyihao's avatar
dengyihao 已提交
638
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
639
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
640
  return taosRemoveRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
641 642
}

dengyihao's avatar
dengyihao 已提交
643
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
644
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
645
  return (void*)taosAcquireRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
646 647
}

dengyihao's avatar
dengyihao 已提交
648
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
649
  // release extern handle
dengyihao's avatar
dengyihao 已提交
650
  return taosReleaseRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
651 652 653 654 655 656 657
}
void transDestoryExHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
658
#endif