transComm.c 17.2 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "transComm.h"

dengyihao's avatar
opt rpc  
dengyihao 已提交
18 19
#define BUFFER_CAP 4096

dengyihao's avatar
dengyihao 已提交
20 21 22
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;

static int32_t refMgt;
dengyihao's avatar
dengyihao 已提交
23
static int32_t instMgt;
dengyihao's avatar
dengyihao 已提交
24

dengyihao's avatar
dengyihao 已提交
25 26 27 28
int32_t transCompressMsg(char* msg, int32_t len) {
  int32_t        ret = 0;
  int            compHdr = sizeof(STransCompMsg);
  STransMsgHead* pHead = transHeadFromCont(msg);
dengyihao's avatar
dengyihao 已提交
29

dengyihao's avatar
dengyihao 已提交
30
  char* buf = taosMemoryMalloc(len + compHdr + 8);  // 8 extra bytes
dengyihao's avatar
dengyihao 已提交
31 32
  if (buf == NULL) {
    tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
dengyihao's avatar
dengyihao 已提交
33 34
    ret = len;
    return ret;
dengyihao's avatar
dengyihao 已提交
35 36
  }

dengyihao's avatar
dengyihao 已提交
37
  int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
dengyihao's avatar
dengyihao 已提交
38 39 40 41
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
dengyihao's avatar
dengyihao 已提交
42
  if (clen > 0 && clen < len - compHdr) {
dengyihao's avatar
dengyihao 已提交
43 44 45
    STransCompMsg* pComp = (STransCompMsg*)msg;
    pComp->reserved = 0;
    pComp->contLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
46
    memcpy(msg + compHdr, buf, clen);
dengyihao's avatar
dengyihao 已提交
47 48

    tDebug("compress rpc msg, before:%d, after:%d", len, clen);
dengyihao's avatar
dengyihao 已提交
49 50
    ret = clen + compHdr;
    pHead->comp = 1;
dengyihao's avatar
dengyihao 已提交
51
  } else {
dengyihao's avatar
dengyihao 已提交
52 53
    ret = len;
    pHead->comp = 0;
dengyihao's avatar
dengyihao 已提交
54
  }
wafwerar's avatar
wafwerar 已提交
55
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
56
  return ret;
dengyihao's avatar
dengyihao 已提交
57
}
dengyihao's avatar
dengyihao 已提交
58 59 60 61
int32_t transDecompressMsg(char** msg, int32_t len) {
  STransMsgHead* pHead = (STransMsgHead*)(*msg);
  if (pHead->comp == 0) return 0;

dengyihao's avatar
dengyihao 已提交
62 63
  char* pCont = transContFromHead(pHead);

dengyihao's avatar
dengyihao 已提交
64 65 66 67 68
  STransCompMsg* pComp = (STransCompMsg*)pCont;
  int32_t        oriLen = htonl(pComp->contLen);

  char*          buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
  STransMsgHead* pNewHead = (STransMsgHead*)buf;
dengyihao's avatar
dengyihao 已提交
69
  int32_t        decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content,
dengyihao's avatar
dengyihao 已提交
70
                                                 len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
dengyihao's avatar
dengyihao 已提交
71
  memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
dengyihao's avatar
dengyihao 已提交
72

dengyihao's avatar
dengyihao 已提交
73 74 75 76 77 78 79 80
  pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead));

  taosMemoryFree(pHead);
  *msg = buf;
  if (decompLen != oriLen) {
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
81 82
}

dengyihao's avatar
dengyihao 已提交
83 84 85 86
void transFreeMsg(void* msg) {
  if (msg == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
87
  taosMemoryFree((char*)msg - sizeof(STransMsgHead));
dengyihao's avatar
dengyihao 已提交
88
}
dengyihao's avatar
dengyihao 已提交
89
int transSockInfo2Str(struct sockaddr* sockname, char* dst) {
dengyihao's avatar
dengyihao 已提交
90
  struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
dengyihao's avatar
dengyihao 已提交
91

dengyihao's avatar
dengyihao 已提交
92 93 94 95 96
  char buf[20] = {0};
  int  r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
  sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
  return r;
}
dengyihao's avatar
dengyihao 已提交
97
int transInitBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
98 99 100 101 102
  buf->cap = BUFFER_CAP;
  buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
  buf->left = -1;
  buf->len = 0;
  buf->total = 0;
dengyihao's avatar
dengyihao 已提交
103
  buf->invalid = 0;
dengyihao's avatar
opt rpc  
dengyihao 已提交
104 105
  return 0;
}
dengyihao's avatar
dengyihao 已提交
106 107 108
int transDestroyBuffer(SConnBuffer* p) {
  taosMemoryFree(p->buf);
  p->buf = NULL;
dengyihao's avatar
dengyihao 已提交
109 110
  return 0;
}
dengyihao's avatar
opt rpc  
dengyihao 已提交
111

dengyihao's avatar
dengyihao 已提交
112
int transClearBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
113 114 115 116 117 118 119 120
  SConnBuffer* p = buf;
  if (p->cap > BUFFER_CAP) {
    p->cap = BUFFER_CAP;
    p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
  }
  p->left = -1;
  p->len = 0;
  p->total = 0;
dengyihao's avatar
dengyihao 已提交
121
  p->invalid = 0;
dengyihao's avatar
opt rpc  
dengyihao 已提交
122 123 124 125
  return 0;
}

int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
dengyihao's avatar
dengyihao 已提交
126 127
  static const int HEADSIZE = sizeof(STransMsgHead);

dengyihao's avatar
opt rpc  
dengyihao 已提交
128
  SConnBuffer* p = connBuf;
dengyihao's avatar
dengyihao 已提交
129
  if (p->left != 0 || p->total <= 0) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
130 131
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
132
  int total = p->total;
dengyihao's avatar
dengyihao 已提交
133
  if (total >= HEADSIZE && !p->invalid) {
dengyihao's avatar
dengyihao 已提交
134 135
    *buf = taosMemoryCalloc(1, total);
    memcpy(*buf, p->buf, total);
dengyihao's avatar
dengyihao 已提交
136 137 138
    if (transResetBuffer(connBuf) < 0) {
      return -1;
    }
dengyihao's avatar
dengyihao 已提交
139 140 141
  } else {
    total = -1;
  }
dengyihao's avatar
opt rpc  
dengyihao 已提交
142 143 144 145 146
  return total;
}

int transResetBuffer(SConnBuffer* connBuf) {
  SConnBuffer* p = connBuf;
dengyihao's avatar
dengyihao 已提交
147
  if (p->total < p->len) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
148 149 150 151 152
    int left = p->len - p->total;
    memmove(p->buf, p->buf + p->total, left);
    p->left = -1;
    p->total = 0;
    p->len = left;
dengyihao's avatar
dengyihao 已提交
153
  } else if (p->total == p->len) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
154 155 156
    p->left = -1;
    p->total = 0;
    p->len = 0;
dengyihao's avatar
dengyihao 已提交
157
  } else {
dengyihao's avatar
dengyihao 已提交
158 159
    ASSERTS(0, "invalid read from sock buf");
    return -1;
dengyihao's avatar
opt rpc  
dengyihao 已提交
160
  }
dengyihao's avatar
dengyihao 已提交
161 162 163 164 165 166
  return 0;
}
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
  /*
   * formate of data buffer:
   * |<--------------------------data from socket------------------------------->|
dengyihao's avatar
dengyihao 已提交
167 168
   * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
   * info--->|
dengyihao's avatar
dengyihao 已提交
169 170
   */
  SConnBuffer* p = connBuf;
dengyihao's avatar
opt rpc  
dengyihao 已提交
171 172
  uvBuf->base = p->buf + p->len;
  if (p->left == -1) {
dengyihao's avatar
dengyihao 已提交
173
    uvBuf->len = p->cap - p->len;
dengyihao's avatar
opt rpc  
dengyihao 已提交
174 175 176 177
  } else {
    if (p->left < p->cap - p->len) {
      uvBuf->len = p->left;
    } else {
dengyihao's avatar
dengyihao 已提交
178 179
      p->cap = p->left + p->len;
      p->buf = taosMemoryRealloc(p->buf, p->cap);
dengyihao's avatar
opt rpc  
dengyihao 已提交
180 181 182
      uvBuf->base = p->buf + p->len;
      uvBuf->len = p->left;
    }
dengyihao's avatar
dengyihao 已提交
183 184 185
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
186 187
// check whether already read complete
bool transReadComplete(SConnBuffer* connBuf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
188 189 190 191 192 193 194
  SConnBuffer* p = connBuf;
  if (p->len >= sizeof(STransMsgHead)) {
    if (p->left == -1) {
      STransMsgHead head;
      memcpy((char*)&head, connBuf->buf, sizeof(head));
      int32_t msgLen = (int32_t)htonl(head.msgLen);
      p->total = msgLen;
dengyihao's avatar
dengyihao 已提交
195
      p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum));
dengyihao's avatar
opt rpc  
dengyihao 已提交
196 197 198 199 200 201
    }
    if (p->total >= p->len) {
      p->left = p->total - p->len;
    } else {
      p->left = 0;
    }
dengyihao's avatar
dengyihao 已提交
202
  }
dengyihao's avatar
dengyihao 已提交
203
  return (p->left == 0 || p->invalid) ? true : false;
dengyihao's avatar
dengyihao 已提交
204
}
dengyihao's avatar
dengyihao 已提交
205

dengyihao's avatar
dengyihao 已提交
206
int transSetConnOption(uv_tcp_t* stream) {
207 208 209 210
#if defined(WINDOWS) || defined(DARWIN)
#else
  uv_tcp_keepalive(stream, 1, 20);
#endif
dengyihao's avatar
dengyihao 已提交
211 212
  return uv_tcp_nodelay(stream, 1);
  // int ret = uv_tcp_keepalive(stream, 5, 60);
dengyihao's avatar
dengyihao 已提交
213 214
}

dengyihao's avatar
dengyihao 已提交
215
SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
wafwerar's avatar
wafwerar 已提交
216
  SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
dengyihao's avatar
dengyihao 已提交
217
  pool->nAsync = sz;
wafwerar's avatar
wafwerar 已提交
218
  pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
dengyihao's avatar
dengyihao 已提交
219

dengyihao's avatar
dengyihao 已提交
220 221 222 223
  int i = 0, err = 0;
  for (i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);

wafwerar's avatar
wafwerar 已提交
224
    SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
dengyihao's avatar
dengyihao 已提交
225 226
    item->pThrd = arg;
    QUEUE_INIT(&item->qmsg);
wafwerar's avatar
wafwerar 已提交
227
    taosThreadMutexInit(&item->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
228 229

    async->data = item;
dengyihao's avatar
dengyihao 已提交
230 231 232 233 234
    err = uv_async_init(loop, async, cb);
    if (err != 0) {
      tError("failed to init async, reason:%s", uv_err_name(err));
      break;
    }
dengyihao's avatar
dengyihao 已提交
235
  }
dengyihao's avatar
dengyihao 已提交
236 237 238 239 240 241

  if (i != pool->nAsync) {
    transAsyncPoolDestroy(pool);
    pool = NULL;
  }

dengyihao's avatar
dengyihao 已提交
242 243
  return pool;
}
dengyihao's avatar
dengyihao 已提交
244

dengyihao's avatar
dengyihao 已提交
245
void transAsyncPoolDestroy(SAsyncPool* pool) {
dengyihao's avatar
dengyihao 已提交
246 247
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
dengyihao's avatar
dengyihao 已提交
248
    SAsyncItem* item = async->data;
dengyihao's avatar
dengyihao 已提交
249 250
    if (item == NULL) continue;

wafwerar's avatar
wafwerar 已提交
251
    taosThreadMutexDestroy(&item->mtx);
wafwerar's avatar
wafwerar 已提交
252
    taosMemoryFree(item);
dengyihao's avatar
dengyihao 已提交
253
  }
wafwerar's avatar
wafwerar 已提交
254 255
  taosMemoryFree(pool->asyncs);
  taosMemoryFree(pool);
dengyihao's avatar
dengyihao 已提交
256
}
dengyihao's avatar
dengyihao 已提交
257 258 259 260 261 262 263 264
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;
    if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
265
int transAsyncSend(SAsyncPool* pool, queue* q) {
dengyihao's avatar
dengyihao 已提交
266 267 268
  if (atomic_load_8(&pool->stop) == 1) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
269 270
  int idx = pool->index % pool->nAsync;

dengyihao's avatar
dengyihao 已提交
271
  // no need mutex here
dengyihao's avatar
dengyihao 已提交
272
  if (pool->index++ > pool->nAsync * 2000) {
dengyihao's avatar
dengyihao 已提交
273 274
    pool->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
275 276 277
  uv_async_t* async = &(pool->asyncs[idx]);
  SAsyncItem* item = async->data;

wafwerar's avatar
wafwerar 已提交
278
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
279
  QUEUE_PUSH(&item->qmsg, q);
wafwerar's avatar
wafwerar 已提交
280
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
281
  return uv_async_send(async);
dengyihao's avatar
dengyihao 已提交
282
}
dengyihao's avatar
dengyihao 已提交
283

dengyihao's avatar
dengyihao 已提交
284 285 286 287
void transCtxInit(STransCtx* ctx) {
  // init transCtx
  ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
}
dengyihao's avatar
dengyihao 已提交
288
void transCtxCleanup(STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
289 290 291 292 293 294
  if (ctx->args == NULL) {
    return;
  }

  STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
  while (iter) {
dengyihao's avatar
dengyihao 已提交
295
    ctx->freeFunc(iter->val);
dengyihao's avatar
dengyihao 已提交
296 297
    iter = taosHashIterate(ctx->args, iter);
  }
dengyihao's avatar
dengyihao 已提交
298
  ctx->freeFunc(ctx->brokenVal.val);
dengyihao's avatar
dengyihao 已提交
299
  taosHashCleanup(ctx->args);
U
ubuntu 已提交
300
  ctx->args = NULL;
dengyihao's avatar
dengyihao 已提交
301 302 303
}

void transCtxMerge(STransCtx* dst, STransCtx* src) {
dengyihao's avatar
dengyihao 已提交
304 305 306
  if (src->args == NULL || src->freeFunc == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
307 308
  if (dst->args == NULL) {
    dst->args = src->args;
dengyihao's avatar
dengyihao 已提交
309
    dst->brokenVal = src->brokenVal;
dengyihao's avatar
dengyihao 已提交
310
    dst->freeFunc = src->freeFunc;
dengyihao's avatar
dengyihao 已提交
311 312 313 314 315 316 317 318 319 320
    src->args = NULL;
    return;
  }
  void*  key = NULL;
  size_t klen = 0;
  void*  iter = taosHashIterate(src->args, NULL);
  while (iter) {
    STransCtxVal* sVal = (STransCtxVal*)iter;
    key = taosHashGetKey(sVal, &klen);

dengyihao's avatar
dengyihao 已提交
321 322 323 324
    // STransCtxVal* dVal = taosHashGet(dst->args, key, klen);
    // if (dVal) {
    //   dst->freeFunc(dVal->val);
    // }
dengyihao's avatar
dengyihao 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337
    taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
    iter = taosHashIterate(src->args, iter);
  }
  taosHashCleanup(src->args);
}
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
  if (ctx->args == NULL) {
    return NULL;
  }
  STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
  if (cVal == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
338
  void* ret = NULL;
D
dapan1121 已提交
339 340
  (*cVal->clone)(cVal->val, &ret);
  return ret;
dengyihao's avatar
dengyihao 已提交
341
}
dengyihao's avatar
dengyihao 已提交
342
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
dengyihao's avatar
dengyihao 已提交
343
  void* ret = NULL;
dengyihao's avatar
dengyihao 已提交
344
  if (ctx->brokenVal.clone == NULL) {
dengyihao's avatar
dengyihao 已提交
345
    return ret;
dengyihao's avatar
dengyihao 已提交
346
  }
D
dapan1121 已提交
347
  (*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret);
dengyihao's avatar
dengyihao 已提交
348 349 350

  *msgType = ctx->brokenVal.msgType;

D
dapan1121 已提交
351
  return ret;
dengyihao's avatar
dengyihao 已提交
352
}
dengyihao's avatar
dengyihao 已提交
353

dengyihao's avatar
dengyihao 已提交
354
void transReqQueueInit(queue* q) {
dengyihao's avatar
dengyihao 已提交
355
  // init req queue
dengyihao's avatar
dengyihao 已提交
356 357
  QUEUE_INIT(q);
}
dengyihao's avatar
dengyihao 已提交
358
void* transReqQueuePush(queue* q) {
dengyihao's avatar
dengyihao 已提交
359 360 361 362
  STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
  req->wreq.data = req;
  QUEUE_PUSH(q, &req->q);
  return &req->wreq;
dengyihao's avatar
dengyihao 已提交
363 364 365
}
void* transReqQueueRemove(void* arg) {
  void*       ret = NULL;
dengyihao's avatar
dengyihao 已提交
366
  uv_write_t* wreq = arg;
dengyihao's avatar
dengyihao 已提交
367

dengyihao's avatar
dengyihao 已提交
368 369 370
  STransReq* req = wreq ? wreq->data : NULL;
  if (req == NULL) return NULL;
  QUEUE_REMOVE(&req->q);
dengyihao's avatar
dengyihao 已提交
371

dengyihao's avatar
dengyihao 已提交
372 373
  ret = wreq && wreq->handle ? wreq->handle->data : NULL;
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
374 375 376 377 378 379 380 381 382 383 384 385

  return ret;
}
void transReqQueueClear(queue* q) {
  while (!QUEUE_IS_EMPTY(q)) {
    queue* h = QUEUE_HEAD(q);
    QUEUE_REMOVE(h);
    STransReq* req = QUEUE_DATA(h, STransReq, q);
    taosMemoryFree(req);
  }
}

wafwerar's avatar
wafwerar 已提交
386
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
dengyihao's avatar
dengyihao 已提交
387
  queue->q = taosArrayInit(2, sizeof(void*));
wafwerar's avatar
wafwerar 已提交
388
  queue->freeFunc = (void (*)(const void*))freeFunc;
dengyihao's avatar
dengyihao 已提交
389 390
}
bool transQueuePush(STransQueue* queue, void* arg) {
dengyihao's avatar
dengyihao 已提交
391 392 393
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
394 395 396 397 398 399 400
  taosArrayPush(queue->q, &arg);
  if (taosArrayGetSize(queue->q) > 1) {
    return false;
  }
  return true;
}
void* transQueuePop(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
401
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
402 403 404 405 406 407
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, 0);
  taosArrayRemove(queue->q, 0);
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
408
int32_t transQueueSize(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
409 410 411
  if (queue->q == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
412 413 414
  return taosArrayGetSize(queue->q);
}
void* transQueueGet(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
415
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
416 417 418 419 420
    return NULL;
  }
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
421

dengyihao's avatar
dengyihao 已提交
422 423 424 425 426
  void* ptr = taosArrayGetP(queue->q, i);
  return ptr;
}

void* transQueueRm(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
427
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
428 429
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
430 431 432 433 434
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, i);
  taosArrayRemove(queue->q, i);
dengyihao's avatar
dengyihao 已提交
435 436
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
437

dengyihao's avatar
dengyihao 已提交
438
bool transQueueEmpty(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
439 440 441
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
442 443 444
  return taosArrayGetSize(queue->q) == 0;
}
void transQueueClear(STransQueue* queue) {
wafwerar's avatar
wafwerar 已提交
445
  if (queue->freeFunc != NULL) {
dengyihao's avatar
dengyihao 已提交
446 447
    for (int i = 0; i < taosArrayGetSize(queue->q); i++) {
      void* p = taosArrayGetP(queue->q, i);
wafwerar's avatar
wafwerar 已提交
448
      queue->freeFunc(p);
dengyihao's avatar
dengyihao 已提交
449 450 451 452 453 454 455 456
    }
  }
  taosArrayClear(queue->q);
}
void transQueueDestroy(STransQueue* queue) {
  transQueueClear(queue);
  taosArrayDestroy(queue->q);
}
dengyihao's avatar
dengyihao 已提交
457

dengyihao's avatar
dengyihao 已提交
458
static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
dengyihao's avatar
dengyihao 已提交
459 460 461
  SDelayTask* arg1 = container_of(a, SDelayTask, node);
  SDelayTask* arg2 = container_of(b, SDelayTask, node);
  if (arg1->execTime > arg2->execTime) {
dengyihao's avatar
dengyihao 已提交
462
    return 0;
dengyihao's avatar
dengyihao 已提交
463 464 465 466 467
  } else {
    return 1;
  }
}

dengyihao's avatar
dengyihao 已提交
468
static void transDQTimeout(uv_timer_t* timer) {
dengyihao's avatar
dengyihao 已提交
469
  SDelayQueue* queue = timer->data;
dengyihao's avatar
dengyihao 已提交
470 471
  tTrace("timer %p timeout", timer);
  uint64_t timeout = 0;
dengyihao's avatar
dengyihao 已提交
472
  int64_t  current = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
473 474 475 476
  do {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) break;
    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
477
    if (task->execTime <= current) {
dengyihao's avatar
dengyihao 已提交
478 479 480 481 482
      heapRemove(queue->heap, minNode);
      task->func(task->arg);
      taosMemoryFree(task);
      timeout = 0;
    } else {
dengyihao's avatar
dengyihao 已提交
483
      timeout = task->execTime - current;
dengyihao's avatar
dengyihao 已提交
484 485 486 487
      break;
    }
  } while (1);
  if (timeout != 0) {
dengyihao's avatar
dengyihao 已提交
488
    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
dengyihao's avatar
dengyihao 已提交
489 490
  }
}
dengyihao's avatar
dengyihao 已提交
491
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
dengyihao's avatar
dengyihao 已提交
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
  uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
  uv_timer_init(loop, timer);

  Heap* heap = heapCreate(timeCompare);

  SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue));
  q->heap = heap;
  q->timer = timer;
  q->loop = loop;
  q->timer->data = q;

  *queue = q;
  return 0;
}

dengyihao's avatar
dengyihao 已提交
507
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
dengyihao's avatar
dengyihao 已提交
508
  taosMemoryFree(queue->timer);
dengyihao's avatar
dengyihao 已提交
509 510 511 512 513 514 515 516 517

  while (heapSize(queue->heap) > 0) {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) {
      return;
    }
    heapRemove(queue->heap, minNode);

    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
518 519

    STaskArg* arg = task->arg;
dengyihao's avatar
dengyihao 已提交
520
    if (freeFunc) freeFunc(arg);
dengyihao's avatar
dengyihao 已提交
521 522
    taosMemoryFree(arg);

dengyihao's avatar
dengyihao 已提交
523 524
    taosMemoryFree(task);
  }
dengyihao's avatar
dengyihao 已提交
525 526 527
  heapDestroy(queue->heap);
  taosMemoryFree(queue);
}
dengyihao's avatar
dengyihao 已提交
528 529
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
  uv_timer_stop(queue->timer);
dengyihao's avatar
dengyihao 已提交
530

dengyihao's avatar
dengyihao 已提交
531 532 533 534 535
  if (heapSize(queue->heap) <= 0) {
    taosMemoryFree(task->arg);
    taosMemoryFree(task);
    return;
  }
dengyihao's avatar
dengyihao 已提交
536 537
  heapRemove(queue->heap, &task->node);

dengyihao's avatar
dengyihao 已提交
538 539 540
  taosMemoryFree(task->arg);
  taosMemoryFree(task);

dengyihao's avatar
dengyihao 已提交
541 542
  if (heapSize(queue->heap) != 0) {
    HeapNode* minNode = heapMin(queue->heap);
dengyihao's avatar
dengyihao 已提交
543
    if (minNode == NULL) return;
dengyihao's avatar
dengyihao 已提交
544 545 546 547 548 549 550 551 552 553

    uint64_t    now = taosGetTimestampMs();
    SDelayTask* task = container_of(minNode, SDelayTask, node);
    uint64_t    timeout = now > task->execTime ? now - task->execTime : 0;

    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
  }
}

SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
dengyihao's avatar
dengyihao 已提交
554
  uint64_t    now = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
555 556 557
  SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
  task->func = func;
  task->arg = arg;
dengyihao's avatar
dengyihao 已提交
558 559 560 561 562 563
  task->execTime = now + timeoutMs;

  HeapNode* minNode = heapMin(queue->heap);
  if (minNode) {
    SDelayTask* minTask = container_of(minNode, SDelayTask, node);
    if (minTask->execTime < task->execTime) {
dengyihao's avatar
dengyihao 已提交
564
      timeoutMs = minTask->execTime <= now ? 0 : minTask->execTime - now;
dengyihao's avatar
dengyihao 已提交
565 566
    }
  }
dengyihao's avatar
dengyihao 已提交
567

S
Shengliang Guan 已提交
568
  tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs);
dengyihao's avatar
dengyihao 已提交
569
  heapInsert(queue->heap, &task->node);
dengyihao's avatar
dengyihao 已提交
570
  uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
dengyihao's avatar
dengyihao 已提交
571
  return task;
dengyihao's avatar
dengyihao 已提交
572
}
dengyihao's avatar
dengyihao 已提交
573 574 575 576 577 578

void transPrintEpSet(SEpSet* pEpSet) {
  if (pEpSet == NULL) {
    tTrace("NULL epset");
    return;
  }
dengyihao's avatar
dengyihao 已提交
579
  char buf[512] = {0};
dengyihao's avatar
dengyihao 已提交
580
  int  len = snprintf(buf, sizeof(buf), "epset:{");
dengyihao's avatar
dengyihao 已提交
581
  for (int i = 0; i < pEpSet->numOfEps; i++) {
dengyihao's avatar
dengyihao 已提交
582
    if (i == pEpSet->numOfEps - 1) {
dengyihao's avatar
dengyihao 已提交
583
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
dengyihao's avatar
dengyihao 已提交
584 585 586
    } else {
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
    }
dengyihao's avatar
dengyihao 已提交
587
  }
dengyihao's avatar
dengyihao 已提交
588
  len += snprintf(buf + len, sizeof(buf) - len, "}");
dengyihao's avatar
dengyihao 已提交
589
  tTrace("%s, inUse:%d", buf, pEpSet->inUse);
dengyihao's avatar
dengyihao 已提交
590
}
591 592 593 594 595 596 597 598 599 600 601
bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
  if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
    return false;
  }
  for (int i = 0; i < a->numOfEps; i++) {
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
      return false;
    }
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
602

dengyihao's avatar
dengyihao 已提交
603
static void transInitEnv() {
dengyihao's avatar
dengyihao 已提交
604 605
  refMgt = transOpenRefMgt(50000, transDestoryExHandle);
  instMgt = taosOpenRef(50, rpcCloseImpl);
dengyihao's avatar
dengyihao 已提交
606 607
  uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
}
dengyihao's avatar
dengyihao 已提交
608
static void transDestroyEnv() {
dengyihao's avatar
dengyihao 已提交
609 610
  transCloseRefMgt(refMgt);
  transCloseRefMgt(instMgt);
dengyihao's avatar
dengyihao 已提交
611
}
dengyihao's avatar
dengyihao 已提交
612

dengyihao's avatar
dengyihao 已提交
613 614 615 616
void transInit() {
  // init env
  taosThreadOnce(&transModuleInit, transInitEnv);
}
dengyihao's avatar
dengyihao 已提交
617 618 619 620

int32_t transGetRefMgt() { return refMgt; }
int32_t transGetInstMgt() { return instMgt; }

dengyihao's avatar
dengyihao 已提交
621 622 623 624
void transCleanup() {
  // clean env
  transDestroyEnv();
}
dengyihao's avatar
dengyihao 已提交
625
int32_t transOpenRefMgt(int size, void (*func)(void*)) {
dengyihao's avatar
dengyihao 已提交
626
  // added into once later
dengyihao's avatar
dengyihao 已提交
627
  return taosOpenRef(size, func);
dengyihao's avatar
dengyihao 已提交
628
}
dengyihao's avatar
dengyihao 已提交
629
void transCloseRefMgt(int32_t mgt) {
dengyihao's avatar
dengyihao 已提交
630
  // close ref
dengyihao's avatar
dengyihao 已提交
631
  taosCloseRef(mgt);
dengyihao's avatar
dengyihao 已提交
632
}
dengyihao's avatar
dengyihao 已提交
633
int64_t transAddExHandle(int32_t refMgt, void* p) {
dengyihao's avatar
dengyihao 已提交
634
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
635
  return taosAddRef(refMgt, p);
dengyihao's avatar
dengyihao 已提交
636
}
dengyihao's avatar
dengyihao 已提交
637
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
638
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
639
  return taosRemoveRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
640 641
}

dengyihao's avatar
dengyihao 已提交
642
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
643
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
644
  return (void*)taosAcquireRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
645 646
}

dengyihao's avatar
dengyihao 已提交
647
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
648
  // release extern handle
dengyihao's avatar
dengyihao 已提交
649
  return taosReleaseRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
650 651 652 653 654 655 656
}
void transDestoryExHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  taosMemoryFree(handle);
}