transComm.c 17.1 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifdef USE_UV

#include "transComm.h"

dengyihao's avatar
opt rpc  
dengyihao 已提交
19 20
#define BUFFER_CAP 4096

dengyihao's avatar
dengyihao 已提交
21 22 23
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;

static int32_t refMgt;
dengyihao's avatar
dengyihao 已提交
24
static int32_t instMgt;
dengyihao's avatar
dengyihao 已提交
25

dengyihao's avatar
dengyihao 已提交
26 27 28 29
int32_t transCompressMsg(char* msg, int32_t len) {
  int32_t        ret = 0;
  int            compHdr = sizeof(STransCompMsg);
  STransMsgHead* pHead = transHeadFromCont(msg);
dengyihao's avatar
dengyihao 已提交
30

dengyihao's avatar
dengyihao 已提交
31
  char* buf = taosMemoryMalloc(len + compHdr + 8);  // 8 extra bytes
dengyihao's avatar
dengyihao 已提交
32 33
  if (buf == NULL) {
    tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
dengyihao's avatar
dengyihao 已提交
34 35
    ret = len;
    return ret;
dengyihao's avatar
dengyihao 已提交
36 37
  }

dengyihao's avatar
dengyihao 已提交
38
  int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
dengyihao's avatar
dengyihao 已提交
39 40 41 42
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
dengyihao's avatar
dengyihao 已提交
43
  if (clen > 0 && clen < len - compHdr) {
dengyihao's avatar
dengyihao 已提交
44 45 46
    STransCompMsg* pComp = (STransCompMsg*)msg;
    pComp->reserved = 0;
    pComp->contLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
47
    memcpy(msg + compHdr, buf, clen);
dengyihao's avatar
dengyihao 已提交
48 49

    tDebug("compress rpc msg, before:%d, after:%d", len, clen);
dengyihao's avatar
dengyihao 已提交
50 51
    ret = clen + compHdr;
    pHead->comp = 1;
dengyihao's avatar
dengyihao 已提交
52
  } else {
dengyihao's avatar
dengyihao 已提交
53 54
    ret = len;
    pHead->comp = 0;
dengyihao's avatar
dengyihao 已提交
55
  }
wafwerar's avatar
wafwerar 已提交
56
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
57
  return ret;
dengyihao's avatar
dengyihao 已提交
58
}
dengyihao's avatar
dengyihao 已提交
59 60 61 62
int32_t transDecompressMsg(char** msg, int32_t len) {
  STransMsgHead* pHead = (STransMsgHead*)(*msg);
  if (pHead->comp == 0) return 0;

dengyihao's avatar
dengyihao 已提交
63 64
  char* pCont = transContFromHead(pHead);

dengyihao's avatar
dengyihao 已提交
65 66 67 68 69
  STransCompMsg* pComp = (STransCompMsg*)pCont;
  int32_t        oriLen = htonl(pComp->contLen);

  char*          buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
  STransMsgHead* pNewHead = (STransMsgHead*)buf;
dengyihao's avatar
dengyihao 已提交
70 71
  int32_t        decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), pNewHead->content,
                                                 len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
dengyihao's avatar
dengyihao 已提交
72
  memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
dengyihao's avatar
dengyihao 已提交
73

dengyihao's avatar
dengyihao 已提交
74 75 76 77 78 79 80 81
  pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead));

  taosMemoryFree(pHead);
  *msg = buf;
  if (decompLen != oriLen) {
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
82 83
}

dengyihao's avatar
dengyihao 已提交
84 85 86 87
void transFreeMsg(void* msg) {
  if (msg == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
88
  taosMemoryFree((char*)msg - sizeof(STransMsgHead));
dengyihao's avatar
dengyihao 已提交
89
}
dengyihao's avatar
dengyihao 已提交
90
int transSockInfo2Str(struct sockaddr* sockname, char* dst) {
dengyihao's avatar
dengyihao 已提交
91
  struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
dengyihao's avatar
dengyihao 已提交
92

dengyihao's avatar
dengyihao 已提交
93 94 95 96 97
  char buf[20] = {0};
  int  r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
  sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
  return r;
}
dengyihao's avatar
dengyihao 已提交
98
int transInitBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
99 100 101 102 103
  buf->cap = BUFFER_CAP;
  buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
  buf->left = -1;
  buf->len = 0;
  buf->total = 0;
dengyihao's avatar
dengyihao 已提交
104
  buf->invalid = 0;
dengyihao's avatar
opt rpc  
dengyihao 已提交
105 106
  return 0;
}
dengyihao's avatar
dengyihao 已提交
107 108 109
int transDestroyBuffer(SConnBuffer* p) {
  taosMemoryFree(p->buf);
  p->buf = NULL;
dengyihao's avatar
dengyihao 已提交
110 111
  return 0;
}
dengyihao's avatar
opt rpc  
dengyihao 已提交
112

dengyihao's avatar
dengyihao 已提交
113
int transClearBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
114 115 116 117 118 119 120 121
  SConnBuffer* p = buf;
  if (p->cap > BUFFER_CAP) {
    p->cap = BUFFER_CAP;
    p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
  }
  p->left = -1;
  p->len = 0;
  p->total = 0;
dengyihao's avatar
dengyihao 已提交
122
  p->invalid = 0;
dengyihao's avatar
opt rpc  
dengyihao 已提交
123 124 125 126
  return 0;
}

int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
dengyihao's avatar
dengyihao 已提交
127 128
  static const int HEADSIZE = sizeof(STransMsgHead);

dengyihao's avatar
opt rpc  
dengyihao 已提交
129
  SConnBuffer* p = connBuf;
dengyihao's avatar
dengyihao 已提交
130
  if (p->left != 0 || p->total <= 0) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
131 132
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
133
  int total = p->total;
dengyihao's avatar
dengyihao 已提交
134
  if (total >= HEADSIZE && !p->invalid) {
dengyihao's avatar
dengyihao 已提交
135 136
    *buf = taosMemoryCalloc(1, total);
    memcpy(*buf, p->buf, total);
dengyihao's avatar
dengyihao 已提交
137 138 139
    if (transResetBuffer(connBuf) < 0) {
      return -1;
    }
dengyihao's avatar
dengyihao 已提交
140 141 142
  } else {
    total = -1;
  }
dengyihao's avatar
opt rpc  
dengyihao 已提交
143 144 145 146 147
  return total;
}

int transResetBuffer(SConnBuffer* connBuf) {
  SConnBuffer* p = connBuf;
dengyihao's avatar
dengyihao 已提交
148
  if (p->total < p->len) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
149 150 151 152 153
    int left = p->len - p->total;
    memmove(p->buf, p->buf + p->total, left);
    p->left = -1;
    p->total = 0;
    p->len = left;
dengyihao's avatar
dengyihao 已提交
154
  } else if (p->total == p->len) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
155 156 157
    p->left = -1;
    p->total = 0;
    p->len = 0;
dengyihao's avatar
dengyihao 已提交
158
  } else {
dengyihao's avatar
dengyihao 已提交
159 160
    ASSERTS(0, "invalid read from sock buf");
    return -1;
dengyihao's avatar
opt rpc  
dengyihao 已提交
161
  }
dengyihao's avatar
dengyihao 已提交
162 163 164 165 166 167
  return 0;
}
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
  /*
   * formate of data buffer:
   * |<--------------------------data from socket------------------------------->|
dengyihao's avatar
dengyihao 已提交
168 169
   * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
   * info--->|
dengyihao's avatar
dengyihao 已提交
170 171
   */
  SConnBuffer* p = connBuf;
dengyihao's avatar
opt rpc  
dengyihao 已提交
172 173
  uvBuf->base = p->buf + p->len;
  if (p->left == -1) {
dengyihao's avatar
dengyihao 已提交
174
    uvBuf->len = p->cap - p->len;
dengyihao's avatar
opt rpc  
dengyihao 已提交
175 176 177 178
  } else {
    if (p->left < p->cap - p->len) {
      uvBuf->len = p->left;
    } else {
dengyihao's avatar
dengyihao 已提交
179 180
      p->cap = p->left + p->len;
      p->buf = taosMemoryRealloc(p->buf, p->cap);
dengyihao's avatar
opt rpc  
dengyihao 已提交
181 182 183
      uvBuf->base = p->buf + p->len;
      uvBuf->len = p->left;
    }
dengyihao's avatar
dengyihao 已提交
184 185 186
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
187 188
// check whether already read complete
bool transReadComplete(SConnBuffer* connBuf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
189 190 191 192 193 194 195
  SConnBuffer* p = connBuf;
  if (p->len >= sizeof(STransMsgHead)) {
    if (p->left == -1) {
      STransMsgHead head;
      memcpy((char*)&head, connBuf->buf, sizeof(head));
      int32_t msgLen = (int32_t)htonl(head.msgLen);
      p->total = msgLen;
dengyihao's avatar
dengyihao 已提交
196
      p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum));
dengyihao's avatar
opt rpc  
dengyihao 已提交
197 198 199 200 201 202
    }
    if (p->total >= p->len) {
      p->left = p->total - p->len;
    } else {
      p->left = 0;
    }
dengyihao's avatar
dengyihao 已提交
203
  }
dengyihao's avatar
dengyihao 已提交
204
  return (p->left == 0 || p->invalid) ? true : false;
dengyihao's avatar
dengyihao 已提交
205
}
dengyihao's avatar
dengyihao 已提交
206

dengyihao's avatar
dengyihao 已提交
207
int transSetConnOption(uv_tcp_t* stream) {
dengyihao's avatar
dengyihao 已提交
208 209
  return uv_tcp_nodelay(stream, 1);
  // int ret = uv_tcp_keepalive(stream, 5, 60);
dengyihao's avatar
dengyihao 已提交
210 211
}

dengyihao's avatar
dengyihao 已提交
212
SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
wafwerar's avatar
wafwerar 已提交
213
  SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
dengyihao's avatar
dengyihao 已提交
214
  pool->nAsync = sz;
wafwerar's avatar
wafwerar 已提交
215
  pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
dengyihao's avatar
dengyihao 已提交
216

dengyihao's avatar
dengyihao 已提交
217 218 219 220
  int i = 0, err = 0;
  for (i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);

wafwerar's avatar
wafwerar 已提交
221
    SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
dengyihao's avatar
dengyihao 已提交
222 223
    item->pThrd = arg;
    QUEUE_INIT(&item->qmsg);
wafwerar's avatar
wafwerar 已提交
224
    taosThreadMutexInit(&item->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
225 226

    async->data = item;
dengyihao's avatar
dengyihao 已提交
227 228 229 230 231
    err = uv_async_init(loop, async, cb);
    if (err != 0) {
      tError("failed to init async, reason:%s", uv_err_name(err));
      break;
    }
dengyihao's avatar
dengyihao 已提交
232
  }
dengyihao's avatar
dengyihao 已提交
233 234 235 236 237 238

  if (i != pool->nAsync) {
    transAsyncPoolDestroy(pool);
    pool = NULL;
  }

dengyihao's avatar
dengyihao 已提交
239 240
  return pool;
}
dengyihao's avatar
dengyihao 已提交
241

dengyihao's avatar
dengyihao 已提交
242
void transAsyncPoolDestroy(SAsyncPool* pool) {
dengyihao's avatar
dengyihao 已提交
243 244
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
dengyihao's avatar
dengyihao 已提交
245
    SAsyncItem* item = async->data;
dengyihao's avatar
dengyihao 已提交
246 247
    if (item == NULL) continue;

wafwerar's avatar
wafwerar 已提交
248
    taosThreadMutexDestroy(&item->mtx);
wafwerar's avatar
wafwerar 已提交
249
    taosMemoryFree(item);
dengyihao's avatar
dengyihao 已提交
250
  }
wafwerar's avatar
wafwerar 已提交
251 252
  taosMemoryFree(pool->asyncs);
  taosMemoryFree(pool);
dengyihao's avatar
dengyihao 已提交
253
}
dengyihao's avatar
dengyihao 已提交
254 255 256 257 258 259 260 261
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;
    if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
262
int transAsyncSend(SAsyncPool* pool, queue* q) {
dengyihao's avatar
dengyihao 已提交
263 264 265
  if (atomic_load_8(&pool->stop) == 1) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
266 267
  int idx = pool->index % pool->nAsync;

dengyihao's avatar
dengyihao 已提交
268
  // no need mutex here
dengyihao's avatar
dengyihao 已提交
269
  if (pool->index++ > pool->nAsync * 2000) {
dengyihao's avatar
dengyihao 已提交
270 271
    pool->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
272 273 274
  uv_async_t* async = &(pool->asyncs[idx]);
  SAsyncItem* item = async->data;

wafwerar's avatar
wafwerar 已提交
275
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
276
  QUEUE_PUSH(&item->qmsg, q);
wafwerar's avatar
wafwerar 已提交
277
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
278
  return uv_async_send(async);
dengyihao's avatar
dengyihao 已提交
279
}
dengyihao's avatar
dengyihao 已提交
280

dengyihao's avatar
dengyihao 已提交
281 282 283 284
void transCtxInit(STransCtx* ctx) {
  // init transCtx
  ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
}
dengyihao's avatar
dengyihao 已提交
285
void transCtxCleanup(STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
286 287 288 289 290 291
  if (ctx->args == NULL) {
    return;
  }

  STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
  while (iter) {
dengyihao's avatar
dengyihao 已提交
292
    ctx->freeFunc(iter->val);
dengyihao's avatar
dengyihao 已提交
293 294
    iter = taosHashIterate(ctx->args, iter);
  }
dengyihao's avatar
dengyihao 已提交
295
  ctx->freeFunc(ctx->brokenVal.val);
dengyihao's avatar
dengyihao 已提交
296
  taosHashCleanup(ctx->args);
U
ubuntu 已提交
297
  ctx->args = NULL;
dengyihao's avatar
dengyihao 已提交
298 299 300
}

void transCtxMerge(STransCtx* dst, STransCtx* src) {
dengyihao's avatar
dengyihao 已提交
301 302 303
  if (src->args == NULL || src->freeFunc == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
304 305
  if (dst->args == NULL) {
    dst->args = src->args;
dengyihao's avatar
dengyihao 已提交
306
    dst->brokenVal = src->brokenVal;
dengyihao's avatar
dengyihao 已提交
307
    dst->freeFunc = src->freeFunc;
dengyihao's avatar
dengyihao 已提交
308 309 310 311 312 313 314 315 316 317
    src->args = NULL;
    return;
  }
  void*  key = NULL;
  size_t klen = 0;
  void*  iter = taosHashIterate(src->args, NULL);
  while (iter) {
    STransCtxVal* sVal = (STransCtxVal*)iter;
    key = taosHashGetKey(sVal, &klen);

dengyihao's avatar
dengyihao 已提交
318 319 320 321
    // STransCtxVal* dVal = taosHashGet(dst->args, key, klen);
    // if (dVal) {
    //   dst->freeFunc(dVal->val);
    // }
dengyihao's avatar
dengyihao 已提交
322 323 324 325 326 327 328 329 330 331 332 333 334
    taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
    iter = taosHashIterate(src->args, iter);
  }
  taosHashCleanup(src->args);
}
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
  if (ctx->args == NULL) {
    return NULL;
  }
  STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
  if (cVal == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
335
  void* ret = NULL;
D
dapan1121 已提交
336 337
  (*cVal->clone)(cVal->val, &ret);
  return ret;
dengyihao's avatar
dengyihao 已提交
338
}
dengyihao's avatar
dengyihao 已提交
339
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
dengyihao's avatar
dengyihao 已提交
340
  void* ret = NULL;
dengyihao's avatar
dengyihao 已提交
341
  if (ctx->brokenVal.clone == NULL) {
dengyihao's avatar
dengyihao 已提交
342
    return ret;
dengyihao's avatar
dengyihao 已提交
343
  }
D
dapan1121 已提交
344
  (*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret);
dengyihao's avatar
dengyihao 已提交
345 346 347

  *msgType = ctx->brokenVal.msgType;

D
dapan1121 已提交
348
  return ret;
dengyihao's avatar
dengyihao 已提交
349
}
dengyihao's avatar
dengyihao 已提交
350

dengyihao's avatar
dengyihao 已提交
351
void transReqQueueInit(queue* q) {
dengyihao's avatar
dengyihao 已提交
352
  // init req queue
dengyihao's avatar
dengyihao 已提交
353 354
  QUEUE_INIT(q);
}
dengyihao's avatar
dengyihao 已提交
355
void* transReqQueuePush(queue* q) {
dengyihao's avatar
dengyihao 已提交
356 357 358 359
  STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
  req->wreq.data = req;
  QUEUE_PUSH(q, &req->q);
  return &req->wreq;
dengyihao's avatar
dengyihao 已提交
360 361 362
}
void* transReqQueueRemove(void* arg) {
  void*       ret = NULL;
dengyihao's avatar
dengyihao 已提交
363
  uv_write_t* wreq = arg;
dengyihao's avatar
dengyihao 已提交
364

dengyihao's avatar
dengyihao 已提交
365 366 367
  STransReq* req = wreq ? wreq->data : NULL;
  if (req == NULL) return NULL;
  QUEUE_REMOVE(&req->q);
dengyihao's avatar
dengyihao 已提交
368

dengyihao's avatar
dengyihao 已提交
369 370
  ret = wreq && wreq->handle ? wreq->handle->data : NULL;
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
371 372 373 374 375 376 377 378 379 380 381 382

  return ret;
}
void transReqQueueClear(queue* q) {
  while (!QUEUE_IS_EMPTY(q)) {
    queue* h = QUEUE_HEAD(q);
    QUEUE_REMOVE(h);
    STransReq* req = QUEUE_DATA(h, STransReq, q);
    taosMemoryFree(req);
  }
}

wafwerar's avatar
wafwerar 已提交
383
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
dengyihao's avatar
dengyihao 已提交
384
  queue->q = taosArrayInit(2, sizeof(void*));
wafwerar's avatar
wafwerar 已提交
385
  queue->freeFunc = (void (*)(const void*))freeFunc;
dengyihao's avatar
dengyihao 已提交
386 387
}
bool transQueuePush(STransQueue* queue, void* arg) {
dengyihao's avatar
dengyihao 已提交
388 389 390
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
391 392 393 394 395 396 397
  taosArrayPush(queue->q, &arg);
  if (taosArrayGetSize(queue->q) > 1) {
    return false;
  }
  return true;
}
void* transQueuePop(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
398
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
399 400 401 402 403 404
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, 0);
  taosArrayRemove(queue->q, 0);
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
405
int32_t transQueueSize(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
406 407 408
  if (queue->q == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
409 410 411
  return taosArrayGetSize(queue->q);
}
void* transQueueGet(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
412
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
413 414 415 416 417
    return NULL;
  }
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
418

dengyihao's avatar
dengyihao 已提交
419 420 421 422 423
  void* ptr = taosArrayGetP(queue->q, i);
  return ptr;
}

void* transQueueRm(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
424
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
425 426
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
427 428 429 430 431
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, i);
  taosArrayRemove(queue->q, i);
dengyihao's avatar
dengyihao 已提交
432 433
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
434

dengyihao's avatar
dengyihao 已提交
435
bool transQueueEmpty(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
436 437 438
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
439 440 441
  return taosArrayGetSize(queue->q) == 0;
}
void transQueueClear(STransQueue* queue) {
wafwerar's avatar
wafwerar 已提交
442
  if (queue->freeFunc != NULL) {
dengyihao's avatar
dengyihao 已提交
443 444
    for (int i = 0; i < taosArrayGetSize(queue->q); i++) {
      void* p = taosArrayGetP(queue->q, i);
wafwerar's avatar
wafwerar 已提交
445
      queue->freeFunc(p);
dengyihao's avatar
dengyihao 已提交
446 447 448 449 450 451 452 453
    }
  }
  taosArrayClear(queue->q);
}
void transQueueDestroy(STransQueue* queue) {
  transQueueClear(queue);
  taosArrayDestroy(queue->q);
}
dengyihao's avatar
dengyihao 已提交
454

dengyihao's avatar
dengyihao 已提交
455
static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
dengyihao's avatar
dengyihao 已提交
456 457 458
  SDelayTask* arg1 = container_of(a, SDelayTask, node);
  SDelayTask* arg2 = container_of(b, SDelayTask, node);
  if (arg1->execTime > arg2->execTime) {
dengyihao's avatar
dengyihao 已提交
459
    return 0;
dengyihao's avatar
dengyihao 已提交
460 461 462 463 464
  } else {
    return 1;
  }
}

dengyihao's avatar
dengyihao 已提交
465
static void transDQTimeout(uv_timer_t* timer) {
dengyihao's avatar
dengyihao 已提交
466
  SDelayQueue* queue = timer->data;
dengyihao's avatar
dengyihao 已提交
467 468
  tTrace("timer %p timeout", timer);
  uint64_t timeout = 0;
dengyihao's avatar
dengyihao 已提交
469
  int64_t  current = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
470 471 472 473
  do {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) break;
    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
474
    if (task->execTime <= current) {
dengyihao's avatar
dengyihao 已提交
475 476 477 478 479
      heapRemove(queue->heap, minNode);
      task->func(task->arg);
      taosMemoryFree(task);
      timeout = 0;
    } else {
dengyihao's avatar
dengyihao 已提交
480
      timeout = task->execTime - current;
dengyihao's avatar
dengyihao 已提交
481 482 483 484
      break;
    }
  } while (1);
  if (timeout != 0) {
dengyihao's avatar
dengyihao 已提交
485
    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
dengyihao's avatar
dengyihao 已提交
486 487
  }
}
dengyihao's avatar
dengyihao 已提交
488
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
dengyihao's avatar
dengyihao 已提交
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
  uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
  uv_timer_init(loop, timer);

  Heap* heap = heapCreate(timeCompare);

  SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue));
  q->heap = heap;
  q->timer = timer;
  q->loop = loop;
  q->timer->data = q;

  *queue = q;
  return 0;
}

dengyihao's avatar
dengyihao 已提交
504
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
dengyihao's avatar
dengyihao 已提交
505
  taosMemoryFree(queue->timer);
dengyihao's avatar
dengyihao 已提交
506 507 508 509 510 511 512 513 514

  while (heapSize(queue->heap) > 0) {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) {
      return;
    }
    heapRemove(queue->heap, minNode);

    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
515 516

    STaskArg* arg = task->arg;
dengyihao's avatar
dengyihao 已提交
517
    if (freeFunc) freeFunc(arg);
dengyihao's avatar
dengyihao 已提交
518 519
    taosMemoryFree(arg);

dengyihao's avatar
dengyihao 已提交
520 521
    taosMemoryFree(task);
  }
dengyihao's avatar
dengyihao 已提交
522 523 524
  heapDestroy(queue->heap);
  taosMemoryFree(queue);
}
dengyihao's avatar
dengyihao 已提交
525 526
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
  uv_timer_stop(queue->timer);
dengyihao's avatar
dengyihao 已提交
527

dengyihao's avatar
dengyihao 已提交
528 529 530 531 532
  if (heapSize(queue->heap) <= 0) {
    taosMemoryFree(task->arg);
    taosMemoryFree(task);
    return;
  }
dengyihao's avatar
dengyihao 已提交
533 534
  heapRemove(queue->heap, &task->node);

dengyihao's avatar
dengyihao 已提交
535 536 537
  taosMemoryFree(task->arg);
  taosMemoryFree(task);

dengyihao's avatar
dengyihao 已提交
538 539
  if (heapSize(queue->heap) != 0) {
    HeapNode* minNode = heapMin(queue->heap);
dengyihao's avatar
dengyihao 已提交
540
    if (minNode == NULL) return;
dengyihao's avatar
dengyihao 已提交
541 542 543 544 545 546 547 548 549 550

    uint64_t    now = taosGetTimestampMs();
    SDelayTask* task = container_of(minNode, SDelayTask, node);
    uint64_t    timeout = now > task->execTime ? now - task->execTime : 0;

    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
  }
}

SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
dengyihao's avatar
dengyihao 已提交
551
  uint64_t    now = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
552 553 554
  SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
  task->func = func;
  task->arg = arg;
dengyihao's avatar
dengyihao 已提交
555 556 557 558 559 560
  task->execTime = now + timeoutMs;

  HeapNode* minNode = heapMin(queue->heap);
  if (minNode) {
    SDelayTask* minTask = container_of(minNode, SDelayTask, node);
    if (minTask->execTime < task->execTime) {
dengyihao's avatar
dengyihao 已提交
561
      timeoutMs = minTask->execTime <= now ? 0 : minTask->execTime - now;
dengyihao's avatar
dengyihao 已提交
562 563
    }
  }
dengyihao's avatar
dengyihao 已提交
564

S
Shengliang Guan 已提交
565
  tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs);
dengyihao's avatar
dengyihao 已提交
566
  heapInsert(queue->heap, &task->node);
dengyihao's avatar
dengyihao 已提交
567
  uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
dengyihao's avatar
dengyihao 已提交
568
  return task;
dengyihao's avatar
dengyihao 已提交
569
}
dengyihao's avatar
dengyihao 已提交
570 571 572 573 574 575

void transPrintEpSet(SEpSet* pEpSet) {
  if (pEpSet == NULL) {
    tTrace("NULL epset");
    return;
  }
dengyihao's avatar
dengyihao 已提交
576
  char buf[512] = {0};
dengyihao's avatar
dengyihao 已提交
577
  int  len = snprintf(buf, sizeof(buf), "epset:{");
dengyihao's avatar
dengyihao 已提交
578
  for (int i = 0; i < pEpSet->numOfEps; i++) {
dengyihao's avatar
dengyihao 已提交
579
    if (i == pEpSet->numOfEps - 1) {
dengyihao's avatar
dengyihao 已提交
580
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
dengyihao's avatar
dengyihao 已提交
581 582 583
    } else {
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
    }
dengyihao's avatar
dengyihao 已提交
584
  }
dengyihao's avatar
dengyihao 已提交
585
  len += snprintf(buf + len, sizeof(buf) - len, "}");
dengyihao's avatar
dengyihao 已提交
586
  tTrace("%s, inUse:%d", buf, pEpSet->inUse);
dengyihao's avatar
dengyihao 已提交
587
}
588 589 590 591 592 593 594 595 596 597 598
bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
  if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
    return false;
  }
  for (int i = 0; i < a->numOfEps; i++) {
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
      return false;
    }
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
599

dengyihao's avatar
dengyihao 已提交
600
static void transInitEnv() {
dengyihao's avatar
dengyihao 已提交
601 602
  refMgt = transOpenRefMgt(50000, transDestoryExHandle);
  instMgt = taosOpenRef(50, rpcCloseImpl);
dengyihao's avatar
dengyihao 已提交
603 604
  uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
}
dengyihao's avatar
dengyihao 已提交
605
static void transDestroyEnv() {
dengyihao's avatar
dengyihao 已提交
606 607
  transCloseRefMgt(refMgt);
  transCloseRefMgt(instMgt);
dengyihao's avatar
dengyihao 已提交
608
}
dengyihao's avatar
dengyihao 已提交
609

dengyihao's avatar
dengyihao 已提交
610 611 612 613
void transInit() {
  // init env
  taosThreadOnce(&transModuleInit, transInitEnv);
}
dengyihao's avatar
dengyihao 已提交
614 615 616 617

int32_t transGetRefMgt() { return refMgt; }
int32_t transGetInstMgt() { return instMgt; }

dengyihao's avatar
dengyihao 已提交
618 619 620 621
void transCleanup() {
  // clean env
  transDestroyEnv();
}
dengyihao's avatar
dengyihao 已提交
622
int32_t transOpenRefMgt(int size, void (*func)(void*)) {
dengyihao's avatar
dengyihao 已提交
623
  // added into once later
dengyihao's avatar
dengyihao 已提交
624
  return taosOpenRef(size, func);
dengyihao's avatar
dengyihao 已提交
625
}
dengyihao's avatar
dengyihao 已提交
626
void transCloseRefMgt(int32_t mgt) {
dengyihao's avatar
dengyihao 已提交
627
  // close ref
dengyihao's avatar
dengyihao 已提交
628
  taosCloseRef(mgt);
dengyihao's avatar
dengyihao 已提交
629
}
dengyihao's avatar
dengyihao 已提交
630
int64_t transAddExHandle(int32_t refMgt, void* p) {
dengyihao's avatar
dengyihao 已提交
631
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
632
  return taosAddRef(refMgt, p);
dengyihao's avatar
dengyihao 已提交
633
}
dengyihao's avatar
dengyihao 已提交
634
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
635
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
636
  return taosRemoveRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
637 638
}

dengyihao's avatar
dengyihao 已提交
639
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
640
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
641
  return (void*)taosAcquireRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
642 643
}

dengyihao's avatar
dengyihao 已提交
644
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
645
  // release extern handle
dengyihao's avatar
dengyihao 已提交
646
  return taosReleaseRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
647 648 649 650 651 652 653
}
void transDestoryExHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
654
#endif