transComm.c 16.7 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifdef USE_UV

#include "transComm.h"

dengyihao's avatar
opt rpc  
dengyihao 已提交
19 20
#define BUFFER_CAP 4096

dengyihao's avatar
dengyihao 已提交
21 22 23
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;

static int32_t refMgt;
dengyihao's avatar
dengyihao 已提交
24
static int32_t instMgt;
dengyihao's avatar
dengyihao 已提交
25

dengyihao's avatar
dengyihao 已提交
26 27 28 29
int32_t transCompressMsg(char* msg, int32_t len) {
  int32_t        ret = 0;
  int            compHdr = sizeof(STransCompMsg);
  STransMsgHead* pHead = transHeadFromCont(msg);
dengyihao's avatar
dengyihao 已提交
30

dengyihao's avatar
dengyihao 已提交
31
  char* buf = taosMemoryMalloc(len + compHdr + 8);  // 8 extra bytes
dengyihao's avatar
dengyihao 已提交
32 33
  if (buf == NULL) {
    tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
dengyihao's avatar
dengyihao 已提交
34 35
    ret = len;
    return ret;
dengyihao's avatar
dengyihao 已提交
36 37
  }

dengyihao's avatar
dengyihao 已提交
38
  int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
dengyihao's avatar
dengyihao 已提交
39 40 41 42
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
dengyihao's avatar
dengyihao 已提交
43
  if (clen > 0 && clen < len - compHdr) {
dengyihao's avatar
dengyihao 已提交
44 45 46
    STransCompMsg* pComp = (STransCompMsg*)msg;
    pComp->reserved = 0;
    pComp->contLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
47
    memcpy(msg + compHdr, buf, clen);
dengyihao's avatar
dengyihao 已提交
48 49

    tDebug("compress rpc msg, before:%d, after:%d", len, clen);
dengyihao's avatar
dengyihao 已提交
50 51
    ret = clen + compHdr;
    pHead->comp = 1;
dengyihao's avatar
dengyihao 已提交
52
  } else {
dengyihao's avatar
dengyihao 已提交
53 54
    ret = len;
    pHead->comp = 0;
dengyihao's avatar
dengyihao 已提交
55
  }
wafwerar's avatar
wafwerar 已提交
56
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
57
  return ret;
dengyihao's avatar
dengyihao 已提交
58
}
dengyihao's avatar
dengyihao 已提交
59 60 61 62
int32_t transDecompressMsg(char** msg, int32_t len) {
  STransMsgHead* pHead = (STransMsgHead*)(*msg);
  if (pHead->comp == 0) return 0;

dengyihao's avatar
dengyihao 已提交
63 64
  char* pCont = transContFromHead(pHead);

dengyihao's avatar
dengyihao 已提交
65 66 67 68 69
  STransCompMsg* pComp = (STransCompMsg*)pCont;
  int32_t        oriLen = htonl(pComp->contLen);

  char*          buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
  STransMsgHead* pNewHead = (STransMsgHead*)buf;
dengyihao's avatar
dengyihao 已提交
70 71
  int32_t        decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), pNewHead->content,
                                                 len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
dengyihao's avatar
dengyihao 已提交
72
  memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
dengyihao's avatar
dengyihao 已提交
73

dengyihao's avatar
dengyihao 已提交
74 75 76 77 78 79 80 81
  pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead));

  taosMemoryFree(pHead);
  *msg = buf;
  if (decompLen != oriLen) {
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
82 83
}

dengyihao's avatar
dengyihao 已提交
84 85 86 87
void transFreeMsg(void* msg) {
  if (msg == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
88
  taosMemoryFree((char*)msg - sizeof(STransMsgHead));
dengyihao's avatar
dengyihao 已提交
89
}
dengyihao's avatar
dengyihao 已提交
90
int transSockInfo2Str(struct sockaddr* sockname, char* dst) {
dengyihao's avatar
dengyihao 已提交
91
  struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
dengyihao's avatar
dengyihao 已提交
92

dengyihao's avatar
dengyihao 已提交
93 94 95 96 97
  char buf[20] = {0};
  int  r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
  sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
  return r;
}
dengyihao's avatar
dengyihao 已提交
98
int transInitBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
99 100 101 102 103
  buf->cap = BUFFER_CAP;
  buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
  buf->left = -1;
  buf->len = 0;
  buf->total = 0;
dengyihao's avatar
dengyihao 已提交
104
  buf->invalid = 0;
dengyihao's avatar
opt rpc  
dengyihao 已提交
105 106
  return 0;
}
dengyihao's avatar
dengyihao 已提交
107 108 109
int transDestroyBuffer(SConnBuffer* p) {
  taosMemoryFree(p->buf);
  p->buf = NULL;
dengyihao's avatar
dengyihao 已提交
110 111
  return 0;
}
dengyihao's avatar
opt rpc  
dengyihao 已提交
112

dengyihao's avatar
dengyihao 已提交
113
int transClearBuffer(SConnBuffer* buf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
114 115 116 117 118 119 120 121
  SConnBuffer* p = buf;
  if (p->cap > BUFFER_CAP) {
    p->cap = BUFFER_CAP;
    p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
  }
  p->left = -1;
  p->len = 0;
  p->total = 0;
dengyihao's avatar
dengyihao 已提交
122
  p->invalid = 0;
dengyihao's avatar
opt rpc  
dengyihao 已提交
123 124 125 126
  return 0;
}

int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
dengyihao's avatar
dengyihao 已提交
127 128
  static const int HEADSIZE = sizeof(STransMsgHead);

dengyihao's avatar
opt rpc  
dengyihao 已提交
129
  SConnBuffer* p = connBuf;
dengyihao's avatar
dengyihao 已提交
130
  if (p->left != 0 || p->total <= 0) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
131 132
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
133
  int total = p->total;
dengyihao's avatar
dengyihao 已提交
134
  if (total >= HEADSIZE && !p->invalid) {
dengyihao's avatar
dengyihao 已提交
135 136 137 138 139 140
    *buf = taosMemoryCalloc(1, total);
    memcpy(*buf, p->buf, total);
    transResetBuffer(connBuf);
  } else {
    total = -1;
  }
dengyihao's avatar
opt rpc  
dengyihao 已提交
141 142 143 144 145
  return total;
}

int transResetBuffer(SConnBuffer* connBuf) {
  SConnBuffer* p = connBuf;
dengyihao's avatar
dengyihao 已提交
146
  if (p->total < p->len) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
147 148 149 150 151
    int left = p->len - p->total;
    memmove(p->buf, p->buf + p->total, left);
    p->left = -1;
    p->total = 0;
    p->len = left;
dengyihao's avatar
dengyihao 已提交
152
  } else if (p->total == p->len) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
153 154 155
    p->left = -1;
    p->total = 0;
    p->len = 0;
dengyihao's avatar
dengyihao 已提交
156 157
  } else {
    assert(0);
dengyihao's avatar
opt rpc  
dengyihao 已提交
158
  }
dengyihao's avatar
dengyihao 已提交
159 160 161 162 163 164
  return 0;
}
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
  /*
   * formate of data buffer:
   * |<--------------------------data from socket------------------------------->|
dengyihao's avatar
dengyihao 已提交
165 166
   * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
   * info--->|
dengyihao's avatar
dengyihao 已提交
167 168
   */
  SConnBuffer* p = connBuf;
dengyihao's avatar
opt rpc  
dengyihao 已提交
169 170
  uvBuf->base = p->buf + p->len;
  if (p->left == -1) {
dengyihao's avatar
dengyihao 已提交
171
    uvBuf->len = p->cap - p->len;
dengyihao's avatar
opt rpc  
dengyihao 已提交
172 173 174 175
  } else {
    if (p->left < p->cap - p->len) {
      uvBuf->len = p->left;
    } else {
dengyihao's avatar
dengyihao 已提交
176 177
      p->cap = p->left + p->len;
      p->buf = taosMemoryRealloc(p->buf, p->cap);
dengyihao's avatar
opt rpc  
dengyihao 已提交
178 179 180
      uvBuf->base = p->buf + p->len;
      uvBuf->len = p->left;
    }
dengyihao's avatar
dengyihao 已提交
181 182 183
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
184 185
// check whether already read complete
bool transReadComplete(SConnBuffer* connBuf) {
dengyihao's avatar
opt rpc  
dengyihao 已提交
186 187 188 189 190 191 192
  SConnBuffer* p = connBuf;
  if (p->len >= sizeof(STransMsgHead)) {
    if (p->left == -1) {
      STransMsgHead head;
      memcpy((char*)&head, connBuf->buf, sizeof(head));
      int32_t msgLen = (int32_t)htonl(head.msgLen);
      p->total = msgLen;
dengyihao's avatar
dengyihao 已提交
193
      p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum));
dengyihao's avatar
opt rpc  
dengyihao 已提交
194 195 196 197 198 199
    }
    if (p->total >= p->len) {
      p->left = p->total - p->len;
    } else {
      p->left = 0;
    }
dengyihao's avatar
dengyihao 已提交
200
  }
dengyihao's avatar
dengyihao 已提交
201
  return (p->left == 0 || p->invalid) ? true : false;
dengyihao's avatar
dengyihao 已提交
202
}
dengyihao's avatar
dengyihao 已提交
203

dengyihao's avatar
dengyihao 已提交
204
int transSetConnOption(uv_tcp_t* stream) {
dengyihao's avatar
dengyihao 已提交
205 206
  return uv_tcp_nodelay(stream, 1);
  // int ret = uv_tcp_keepalive(stream, 5, 60);
dengyihao's avatar
dengyihao 已提交
207 208
}

dengyihao's avatar
dengyihao 已提交
209
SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
wafwerar's avatar
wafwerar 已提交
210
  SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
dengyihao's avatar
dengyihao 已提交
211
  pool->nAsync = sz;
wafwerar's avatar
wafwerar 已提交
212
  pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
dengyihao's avatar
dengyihao 已提交
213 214

  for (int i = 0; i < pool->nAsync; i++) {
wafwerar's avatar
wafwerar 已提交
215
    SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
dengyihao's avatar
dengyihao 已提交
216 217
    item->pThrd = arg;
    QUEUE_INIT(&item->qmsg);
wafwerar's avatar
wafwerar 已提交
218
    taosThreadMutexInit(&item->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
219

dengyihao's avatar
dengyihao 已提交
220 221
    uv_async_t* async = &(pool->asyncs[i]);
    uv_async_init(loop, async, cb);
dengyihao's avatar
dengyihao 已提交
222
    async->data = item;
dengyihao's avatar
dengyihao 已提交
223 224 225
  }
  return pool;
}
dengyihao's avatar
dengyihao 已提交
226

dengyihao's avatar
dengyihao 已提交
227
void transAsyncPoolDestroy(SAsyncPool* pool) {
dengyihao's avatar
dengyihao 已提交
228 229
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
dengyihao's avatar
dengyihao 已提交
230

dengyihao's avatar
dengyihao 已提交
231
    SAsyncItem* item = async->data;
wafwerar's avatar
wafwerar 已提交
232
    taosThreadMutexDestroy(&item->mtx);
wafwerar's avatar
wafwerar 已提交
233
    taosMemoryFree(item);
dengyihao's avatar
dengyihao 已提交
234
  }
wafwerar's avatar
wafwerar 已提交
235 236
  taosMemoryFree(pool->asyncs);
  taosMemoryFree(pool);
dengyihao's avatar
dengyihao 已提交
237
}
dengyihao's avatar
dengyihao 已提交
238 239 240 241 242 243 244 245
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;
    if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
246
int transAsyncSend(SAsyncPool* pool, queue* q) {
dengyihao's avatar
dengyihao 已提交
247 248 249
  if (atomic_load_8(&pool->stop) == 1) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
250 251
  int idx = pool->index % pool->nAsync;

dengyihao's avatar
dengyihao 已提交
252
  // no need mutex here
dengyihao's avatar
dengyihao 已提交
253
  if (pool->index++ > pool->nAsync * 2000) {
dengyihao's avatar
dengyihao 已提交
254 255
    pool->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
256 257 258
  uv_async_t* async = &(pool->asyncs[idx]);
  SAsyncItem* item = async->data;

wafwerar's avatar
wafwerar 已提交
259
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
260
  QUEUE_PUSH(&item->qmsg, q);
wafwerar's avatar
wafwerar 已提交
261
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
262
  return uv_async_send(async);
dengyihao's avatar
dengyihao 已提交
263
}
dengyihao's avatar
dengyihao 已提交
264

dengyihao's avatar
dengyihao 已提交
265 266 267 268
void transCtxInit(STransCtx* ctx) {
  // init transCtx
  ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
}
dengyihao's avatar
dengyihao 已提交
269
void transCtxCleanup(STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
270 271 272 273 274 275
  if (ctx->args == NULL) {
    return;
  }

  STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
  while (iter) {
dengyihao's avatar
dengyihao 已提交
276
    ctx->freeFunc(iter->val);
dengyihao's avatar
dengyihao 已提交
277 278
    iter = taosHashIterate(ctx->args, iter);
  }
dengyihao's avatar
dengyihao 已提交
279
  ctx->freeFunc(ctx->brokenVal.val);
dengyihao's avatar
dengyihao 已提交
280
  taosHashCleanup(ctx->args);
U
ubuntu 已提交
281
  ctx->args = NULL;
dengyihao's avatar
dengyihao 已提交
282 283 284 285 286
}

void transCtxMerge(STransCtx* dst, STransCtx* src) {
  if (dst->args == NULL) {
    dst->args = src->args;
dengyihao's avatar
dengyihao 已提交
287
    dst->brokenVal = src->brokenVal;
dengyihao's avatar
dengyihao 已提交
288
    dst->freeFunc = src->freeFunc;
dengyihao's avatar
dengyihao 已提交
289 290 291 292 293 294 295 296 297 298
    src->args = NULL;
    return;
  }
  void*  key = NULL;
  size_t klen = 0;
  void*  iter = taosHashIterate(src->args, NULL);
  while (iter) {
    STransCtxVal* sVal = (STransCtxVal*)iter;
    key = taosHashGetKey(sVal, &klen);

dengyihao's avatar
dengyihao 已提交
299 300 301 302
    // STransCtxVal* dVal = taosHashGet(dst->args, key, klen);
    // if (dVal) {
    //   dst->freeFunc(dVal->val);
    // }
dengyihao's avatar
dengyihao 已提交
303 304 305 306 307 308 309 310 311 312 313 314 315
    taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
    iter = taosHashIterate(src->args, iter);
  }
  taosHashCleanup(src->args);
}
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
  if (ctx->args == NULL) {
    return NULL;
  }
  STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
  if (cVal == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
316
  void* ret = NULL;
D
dapan1121 已提交
317 318
  (*cVal->clone)(cVal->val, &ret);
  return ret;
dengyihao's avatar
dengyihao 已提交
319
}
dengyihao's avatar
dengyihao 已提交
320
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
dengyihao's avatar
dengyihao 已提交
321
  void* ret = NULL;
dengyihao's avatar
dengyihao 已提交
322
  if (ctx->brokenVal.clone == NULL) {
dengyihao's avatar
dengyihao 已提交
323
    return ret;
dengyihao's avatar
dengyihao 已提交
324
  }
D
dapan1121 已提交
325
  (*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret);
dengyihao's avatar
dengyihao 已提交
326 327 328

  *msgType = ctx->brokenVal.msgType;

D
dapan1121 已提交
329
  return ret;
dengyihao's avatar
dengyihao 已提交
330
}
dengyihao's avatar
dengyihao 已提交
331

dengyihao's avatar
dengyihao 已提交
332
void transReqQueueInit(queue* q) {
dengyihao's avatar
dengyihao 已提交
333
  // init req queue
dengyihao's avatar
dengyihao 已提交
334 335
  QUEUE_INIT(q);
}
dengyihao's avatar
dengyihao 已提交
336
void* transReqQueuePush(queue* q) {
dengyihao's avatar
dengyihao 已提交
337 338 339 340
  STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
  req->wreq.data = req;
  QUEUE_PUSH(q, &req->q);
  return &req->wreq;
dengyihao's avatar
dengyihao 已提交
341 342 343
}
void* transReqQueueRemove(void* arg) {
  void*       ret = NULL;
dengyihao's avatar
dengyihao 已提交
344
  uv_write_t* wreq = arg;
dengyihao's avatar
dengyihao 已提交
345

dengyihao's avatar
dengyihao 已提交
346 347 348
  STransReq* req = wreq ? wreq->data : NULL;
  if (req == NULL) return NULL;
  QUEUE_REMOVE(&req->q);
dengyihao's avatar
dengyihao 已提交
349

dengyihao's avatar
dengyihao 已提交
350 351
  ret = wreq && wreq->handle ? wreq->handle->data : NULL;
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
352 353 354 355 356 357 358 359 360 361 362 363

  return ret;
}
void transReqQueueClear(queue* q) {
  while (!QUEUE_IS_EMPTY(q)) {
    queue* h = QUEUE_HEAD(q);
    QUEUE_REMOVE(h);
    STransReq* req = QUEUE_DATA(h, STransReq, q);
    taosMemoryFree(req);
  }
}

wafwerar's avatar
wafwerar 已提交
364
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
dengyihao's avatar
dengyihao 已提交
365
  queue->q = taosArrayInit(2, sizeof(void*));
wafwerar's avatar
wafwerar 已提交
366
  queue->freeFunc = (void (*)(const void*))freeFunc;
dengyihao's avatar
dengyihao 已提交
367 368
}
bool transQueuePush(STransQueue* queue, void* arg) {
dengyihao's avatar
dengyihao 已提交
369 370 371
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
372 373 374 375 376 377 378
  taosArrayPush(queue->q, &arg);
  if (taosArrayGetSize(queue->q) > 1) {
    return false;
  }
  return true;
}
void* transQueuePop(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
379
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
380 381 382 383 384 385
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, 0);
  taosArrayRemove(queue->q, 0);
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
386
int32_t transQueueSize(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
387 388 389
  if (queue->q == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
390 391 392
  return taosArrayGetSize(queue->q);
}
void* transQueueGet(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
393
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
394 395 396 397 398
    return NULL;
  }
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
399

dengyihao's avatar
dengyihao 已提交
400 401 402 403 404
  void* ptr = taosArrayGetP(queue->q, i);
  return ptr;
}

void* transQueueRm(STransQueue* queue, int i) {
dengyihao's avatar
dengyihao 已提交
405
  if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
dengyihao's avatar
dengyihao 已提交
406 407
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
408 409 410 411 412
  if (i >= taosArrayGetSize(queue->q)) {
    return NULL;
  }
  void* ptr = taosArrayGetP(queue->q, i);
  taosArrayRemove(queue->q, i);
dengyihao's avatar
dengyihao 已提交
413 414
  return ptr;
}
dengyihao's avatar
dengyihao 已提交
415

dengyihao's avatar
dengyihao 已提交
416
bool transQueueEmpty(STransQueue* queue) {
dengyihao's avatar
dengyihao 已提交
417 418 419
  if (queue->q == NULL) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
420 421 422
  return taosArrayGetSize(queue->q) == 0;
}
void transQueueClear(STransQueue* queue) {
wafwerar's avatar
wafwerar 已提交
423
  if (queue->freeFunc != NULL) {
dengyihao's avatar
dengyihao 已提交
424 425
    for (int i = 0; i < taosArrayGetSize(queue->q); i++) {
      void* p = taosArrayGetP(queue->q, i);
wafwerar's avatar
wafwerar 已提交
426
      queue->freeFunc(p);
dengyihao's avatar
dengyihao 已提交
427 428 429 430 431 432 433 434
    }
  }
  taosArrayClear(queue->q);
}
void transQueueDestroy(STransQueue* queue) {
  transQueueClear(queue);
  taosArrayDestroy(queue->q);
}
dengyihao's avatar
dengyihao 已提交
435

dengyihao's avatar
dengyihao 已提交
436
static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
dengyihao's avatar
dengyihao 已提交
437 438 439
  SDelayTask* arg1 = container_of(a, SDelayTask, node);
  SDelayTask* arg2 = container_of(b, SDelayTask, node);
  if (arg1->execTime > arg2->execTime) {
dengyihao's avatar
dengyihao 已提交
440
    return 0;
dengyihao's avatar
dengyihao 已提交
441 442 443 444 445
  } else {
    return 1;
  }
}

dengyihao's avatar
dengyihao 已提交
446
static void transDQTimeout(uv_timer_t* timer) {
dengyihao's avatar
dengyihao 已提交
447
  SDelayQueue* queue = timer->data;
dengyihao's avatar
dengyihao 已提交
448 449
  tTrace("timer %p timeout", timer);
  uint64_t timeout = 0;
dengyihao's avatar
dengyihao 已提交
450
  int64_t  current = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
451 452 453 454
  do {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) break;
    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
455
    if (task->execTime <= current) {
dengyihao's avatar
dengyihao 已提交
456 457 458 459 460
      heapRemove(queue->heap, minNode);
      task->func(task->arg);
      taosMemoryFree(task);
      timeout = 0;
    } else {
dengyihao's avatar
dengyihao 已提交
461
      timeout = task->execTime - current;
dengyihao's avatar
dengyihao 已提交
462 463 464 465
      break;
    }
  } while (1);
  if (timeout != 0) {
dengyihao's avatar
dengyihao 已提交
466
    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
dengyihao's avatar
dengyihao 已提交
467 468
  }
}
dengyihao's avatar
dengyihao 已提交
469
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
dengyihao's avatar
dengyihao 已提交
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
  uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
  uv_timer_init(loop, timer);

  Heap* heap = heapCreate(timeCompare);

  SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue));
  q->heap = heap;
  q->timer = timer;
  q->loop = loop;
  q->timer->data = q;

  *queue = q;
  return 0;
}

dengyihao's avatar
dengyihao 已提交
485
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
dengyihao's avatar
dengyihao 已提交
486
  taosMemoryFree(queue->timer);
dengyihao's avatar
dengyihao 已提交
487 488 489 490 491 492 493 494 495

  while (heapSize(queue->heap) > 0) {
    HeapNode* minNode = heapMin(queue->heap);
    if (minNode == NULL) {
      return;
    }
    heapRemove(queue->heap, minNode);

    SDelayTask* task = container_of(minNode, SDelayTask, node);
dengyihao's avatar
dengyihao 已提交
496 497

    STaskArg* arg = task->arg;
dengyihao's avatar
dengyihao 已提交
498
    if (freeFunc) freeFunc(arg);
dengyihao's avatar
dengyihao 已提交
499 500
    taosMemoryFree(arg);

dengyihao's avatar
dengyihao 已提交
501 502
    taosMemoryFree(task);
  }
dengyihao's avatar
dengyihao 已提交
503 504 505
  heapDestroy(queue->heap);
  taosMemoryFree(queue);
}
dengyihao's avatar
dengyihao 已提交
506 507
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
  uv_timer_stop(queue->timer);
dengyihao's avatar
dengyihao 已提交
508

dengyihao's avatar
dengyihao 已提交
509 510 511 512 513
  if (heapSize(queue->heap) <= 0) {
    taosMemoryFree(task->arg);
    taosMemoryFree(task);
    return;
  }
dengyihao's avatar
dengyihao 已提交
514 515
  heapRemove(queue->heap, &task->node);

dengyihao's avatar
dengyihao 已提交
516 517 518
  taosMemoryFree(task->arg);
  taosMemoryFree(task);

dengyihao's avatar
dengyihao 已提交
519 520
  if (heapSize(queue->heap) != 0) {
    HeapNode* minNode = heapMin(queue->heap);
dengyihao's avatar
dengyihao 已提交
521
    if (minNode == NULL) return;
dengyihao's avatar
dengyihao 已提交
522 523 524 525 526 527 528 529 530 531

    uint64_t    now = taosGetTimestampMs();
    SDelayTask* task = container_of(minNode, SDelayTask, node);
    uint64_t    timeout = now > task->execTime ? now - task->execTime : 0;

    uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
  }
}

SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
dengyihao's avatar
dengyihao 已提交
532
  uint64_t    now = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
533 534 535
  SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
  task->func = func;
  task->arg = arg;
dengyihao's avatar
dengyihao 已提交
536 537 538 539 540 541
  task->execTime = now + timeoutMs;

  HeapNode* minNode = heapMin(queue->heap);
  if (minNode) {
    SDelayTask* minTask = container_of(minNode, SDelayTask, node);
    if (minTask->execTime < task->execTime) {
dengyihao's avatar
dengyihao 已提交
542
      timeoutMs = minTask->execTime <= now ? 0 : minTask->execTime - now;
dengyihao's avatar
dengyihao 已提交
543 544
    }
  }
dengyihao's avatar
dengyihao 已提交
545

S
Shengliang Guan 已提交
546
  tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs);
dengyihao's avatar
dengyihao 已提交
547
  heapInsert(queue->heap, &task->node);
dengyihao's avatar
dengyihao 已提交
548
  uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
dengyihao's avatar
dengyihao 已提交
549
  return task;
dengyihao's avatar
dengyihao 已提交
550
}
dengyihao's avatar
dengyihao 已提交
551 552 553 554 555 556

void transPrintEpSet(SEpSet* pEpSet) {
  if (pEpSet == NULL) {
    tTrace("NULL epset");
    return;
  }
dengyihao's avatar
dengyihao 已提交
557
  char buf[512] = {0};
dengyihao's avatar
dengyihao 已提交
558
  int  len = snprintf(buf, sizeof(buf), "epset:{");
dengyihao's avatar
dengyihao 已提交
559
  for (int i = 0; i < pEpSet->numOfEps; i++) {
dengyihao's avatar
dengyihao 已提交
560
    if (i == pEpSet->numOfEps - 1) {
dengyihao's avatar
dengyihao 已提交
561
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
dengyihao's avatar
dengyihao 已提交
562 563 564
    } else {
      len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
    }
dengyihao's avatar
dengyihao 已提交
565
  }
dengyihao's avatar
dengyihao 已提交
566
  len += snprintf(buf + len, sizeof(buf) - len, "}");
dengyihao's avatar
dengyihao 已提交
567
  tTrace("%s, inUse:%d", buf, pEpSet->inUse);
dengyihao's avatar
dengyihao 已提交
568
}
569 570 571 572 573 574 575 576 577 578 579
bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
  if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
    return false;
  }
  for (int i = 0; i < a->numOfEps; i++) {
    if (strncmp(a->eps[i].fqdn, b->eps[i].fqdn, TSDB_FQDN_LEN) != 0 || a->eps[i].port != b->eps[i].port) {
      return false;
    }
  }
  return true;
}
dengyihao's avatar
dengyihao 已提交
580

dengyihao's avatar
dengyihao 已提交
581
static void transInitEnv() {
dengyihao's avatar
dengyihao 已提交
582 583
  refMgt = transOpenRefMgt(50000, transDestoryExHandle);
  instMgt = taosOpenRef(50, rpcCloseImpl);
dengyihao's avatar
dengyihao 已提交
584 585
  uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
}
dengyihao's avatar
dengyihao 已提交
586
static void transDestroyEnv() {
dengyihao's avatar
dengyihao 已提交
587 588
  transCloseRefMgt(refMgt);
  transCloseRefMgt(instMgt);
dengyihao's avatar
dengyihao 已提交
589
}
dengyihao's avatar
dengyihao 已提交
590

dengyihao's avatar
dengyihao 已提交
591 592 593 594
void transInit() {
  // init env
  taosThreadOnce(&transModuleInit, transInitEnv);
}
dengyihao's avatar
dengyihao 已提交
595 596 597 598

int32_t transGetRefMgt() { return refMgt; }
int32_t transGetInstMgt() { return instMgt; }

dengyihao's avatar
dengyihao 已提交
599 600 601 602
void transCleanup() {
  // clean env
  transDestroyEnv();
}
dengyihao's avatar
dengyihao 已提交
603
int32_t transOpenRefMgt(int size, void (*func)(void*)) {
dengyihao's avatar
dengyihao 已提交
604
  // added into once later
dengyihao's avatar
dengyihao 已提交
605
  return taosOpenRef(size, func);
dengyihao's avatar
dengyihao 已提交
606
}
dengyihao's avatar
dengyihao 已提交
607
void transCloseRefMgt(int32_t mgt) {
dengyihao's avatar
dengyihao 已提交
608
  // close ref
dengyihao's avatar
dengyihao 已提交
609
  taosCloseRef(mgt);
dengyihao's avatar
dengyihao 已提交
610
}
dengyihao's avatar
dengyihao 已提交
611
int64_t transAddExHandle(int32_t refMgt, void* p) {
dengyihao's avatar
dengyihao 已提交
612
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
613
  return taosAddRef(refMgt, p);
dengyihao's avatar
dengyihao 已提交
614
}
dengyihao's avatar
dengyihao 已提交
615
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
616
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
617
  return taosRemoveRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
618 619
}

dengyihao's avatar
dengyihao 已提交
620
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
621
  // acquire extern handle
dengyihao's avatar
dengyihao 已提交
622
  return (void*)taosAcquireRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
623 624
}

dengyihao's avatar
dengyihao 已提交
625
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) {
dengyihao's avatar
dengyihao 已提交
626
  // release extern handle
dengyihao's avatar
dengyihao 已提交
627
  return taosReleaseRef(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
628 629 630 631 632 633 634
}
void transDestoryExHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
635
#endif