transCli.c 20.7 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

#include "transComm.h"

20
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
dengyihao's avatar
dengyihao 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23 24
typedef struct SCliConn {
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
25
  uv_write_t*  writeReq;
dengyihao's avatar
dengyihao 已提交
26
  void*        hostThrd;
dengyihao's avatar
dengyihao 已提交
27
  SConnBuffer  readBuf;
dengyihao's avatar
dengyihao 已提交
28 29
  void*        data;
  queue        conn;
dengyihao's avatar
dengyihao 已提交
30
  uint64_t     expireTime;
dengyihao's avatar
dengyihao 已提交
31
  int8_t       notifyCount;  // timers already notify to client
H
Haojun Liao 已提交
32 33 34 35 36 37 38 39 40

  SRpcPush* push;
  int       persist;  //
  // spi configure
  char    spi;
  char    secured;
  int32_t ref;
  // debug and log info
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
41
} SCliConn;
dengyihao's avatar
dengyihao 已提交
42

dengyihao's avatar
dengyihao 已提交
43
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
44 45 46 47
  STransConnCtx* ctx;
  SRpcMsg        msg;
  queue          q;
  uint64_t       st;
dengyihao's avatar
dengyihao 已提交
48 49 50
} SCliMsg;

typedef struct SCliThrdObj {
H
Haojun Liao 已提交
51 52 53 54 55
  pthread_t  thread;
  uv_loop_t* loop;
  // uv_async_t*     cliAsync;  //
  SAsyncPool*     asyncPool;
  uv_timer_t*     timer;
dengyihao's avatar
dengyihao 已提交
56
  void*           pool;  // conn pool
dengyihao's avatar
dengyihao 已提交
57 58
  queue           msg;
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
59
  uint64_t        nextTimeout;  // next timeout
dengyihao's avatar
dengyihao 已提交
60
  void*           pTransInst;   //
H
Haojun Liao 已提交
61
  bool            quit;
dengyihao's avatar
dengyihao 已提交
62 63 64 65 66 67 68 69 70
} SCliThrdObj;

typedef struct SClientObj {
  char          label[TSDB_LABEL_LEN];
  int32_t       index;
  int           numOfThreads;
  SCliThrdObj** pThreadObj;
} SClientObj;

dengyihao's avatar
dengyihao 已提交
71 72 73 74
typedef struct SConnList {
  queue conn;
} SConnList;

dengyihao's avatar
dengyihao 已提交
75
// conn pool
dengyihao's avatar
dengyihao 已提交
76
// add expire timeout and capacity limit
77 78
static void*     creatConnPool(int size);
static void*     destroyConnPool(void* pool);
dengyihao's avatar
dengyihao 已提交
79 80
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void      addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
81

dengyihao's avatar
dengyihao 已提交
82 83
// register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
84 85 86
// check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf);
// alloc buf for read
dengyihao's avatar
dengyihao 已提交
87
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
88
// callback after read nbytes from socket
dengyihao's avatar
dengyihao 已提交
89
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
90
// callback after write data to socket
dengyihao's avatar
dengyihao 已提交
91
static void clientWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
92
// callback after conn  to server
dengyihao's avatar
dengyihao 已提交
93
static void clientConnCb(uv_connect_t* req, int status);
dengyihao's avatar
dengyihao 已提交
94
static void clientAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
95
static void clientDestroy(uv_handle_t* handle);
96
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
dengyihao's avatar
dengyihao 已提交
97

dengyihao's avatar
dengyihao 已提交
98 99 100 101
// process data read from server, auth/decompress etc later
static void clientHandleResp(SCliConn* conn);
// handle except about conn
static void clientHandleExcept(SCliConn* conn);
102 103
// handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
H
Haojun Liao 已提交
104 105
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientSendQuit(SCliThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
106

dengyihao's avatar
dengyihao 已提交
107 108 109 110
static void destroyUserdata(SRpcMsg* userdata);

static void destroyCmsg(SCliMsg* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
111 112 113 114
// thread obj
static SCliThrdObj* createThrdObj();
static void         destroyThrdObj(SCliThrdObj* pThrd);
// thread
dengyihao's avatar
dengyihao 已提交
115 116
static void* clientThread(void* arg);

117
static void clientHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
118 119
  SCliMsg*       pMsg = conn->data;
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
120
  SRpcInfo*      pRpc = pCtx->pTransInst;
dengyihao's avatar
dengyihao 已提交
121

dengyihao's avatar
dengyihao 已提交
122 123 124 125
  STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);

dengyihao's avatar
dengyihao 已提交
126
  SRpcMsg rpcMsg;
dengyihao's avatar
dengyihao 已提交
127
  rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
128
  rpcMsg.pCont = transContFromHead((char*)pHead);
dengyihao's avatar
dengyihao 已提交
129 130
  rpcMsg.code = pHead->code;
  rpcMsg.msgType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
131
  rpcMsg.ahandle = pCtx->ahandle;
dengyihao's avatar
dengyihao 已提交
132

H
Haojun Liao 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
  tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr),
         ntohs(conn->addr.sin_port));

  if (conn->push != NULL && conn->notifyCount != 0) {
    (*conn->push->callback)(conn->push->arg, &rpcMsg);
  } else {
    if (pCtx->pSem == NULL) {
      tTrace("client conn(sync) %p handle resp", conn);
      (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
    } else {
      tTrace("client conn(sync) %p handle resp", conn);
      memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
      tsem_post(pCtx->pSem);
    }
  }
dengyihao's avatar
dengyihao 已提交
148
  conn->notifyCount += 1;
dengyihao's avatar
dengyihao 已提交
149

dengyihao's avatar
dengyihao 已提交
150
  // buf's mem alread translated to rpcMsg.pCont
dengyihao's avatar
dengyihao 已提交
151 152
  transClearBuffer(&conn->readBuf);

dengyihao's avatar
dengyihao 已提交
153
  uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
dengyihao's avatar
dengyihao 已提交
154 155

  SCliThrdObj* pThrd = conn->hostThrd;
H
Haojun Liao 已提交
156 157 158 159
  // user owns conn->persist = 1
  if (conn->push != NULL) {
    addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
  }
dengyihao's avatar
dengyihao 已提交
160

dengyihao's avatar
dengyihao 已提交
161 162
  destroyCmsg(pMsg);
  conn->data = NULL;
dengyihao's avatar
dengyihao 已提交
163
  // start thread's timer of conn pool if not active
H
Haojun Liao 已提交
164 165
  if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
    uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
166
  }
dengyihao's avatar
dengyihao 已提交
167 168
}
static void clientHandleExcept(SCliConn* pConn) {
H
Haojun Liao 已提交
169
  if (pConn->data == NULL && pConn->push == NULL) {
dengyihao's avatar
dengyihao 已提交
170
    // handle conn except in conn pool
dengyihao's avatar
dengyihao 已提交
171 172 173
    clientConnDestroy(pConn, true);
    return;
  }
H
Haojun Liao 已提交
174
  tTrace("client conn %p start to destroy", pConn);
dengyihao's avatar
dengyihao 已提交
175
  SCliMsg* pMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
176

dengyihao's avatar
dengyihao 已提交
177 178
  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
179
  SRpcMsg rpcMsg = {0};
dengyihao's avatar
dengyihao 已提交
180
  rpcMsg.ahandle = pCtx->ahandle;
H
Haojun Liao 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
  rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;

  if (pConn->push != NULL && pConn->notifyCount != 0) {
    (*pConn->push->callback)(pConn->push->arg, &rpcMsg);
  } else {
    if (pCtx->pSem == NULL) {
      (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
    } else {
      memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
      // SRpcMsg rpcMsg
      tsem_post(pCtx->pSem);
    }
    if (pConn->push != NULL) {
      (*pConn->push->callback)(pConn->push->arg, &rpcMsg);
    }
  }
dengyihao's avatar
dengyihao 已提交
197 198 199 200

  destroyCmsg(pMsg);
  pConn->data = NULL;
  // transDestroyConnCtx(pCtx);
dengyihao's avatar
dengyihao 已提交
201
  clientConnDestroy(pConn, true);
H
Haojun Liao 已提交
202
  pConn->notifyCount += 1;
dengyihao's avatar
dengyihao 已提交
203
}
dengyihao's avatar
dengyihao 已提交
204

dengyihao's avatar
dengyihao 已提交
205 206
static void clientTimeoutCb(uv_timer_t* handle) {
  SCliThrdObj* pThrd = handle->data;
dengyihao's avatar
dengyihao 已提交
207
  SRpcInfo*    pRpc = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
208
  int64_t      currentTime = pThrd->nextTimeout;
H
Haojun Liao 已提交
209
  tTrace("client conn timeout, try to remove expire conn from conn pool");
dengyihao's avatar
dengyihao 已提交
210

dengyihao's avatar
dengyihao 已提交
211
  SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
dengyihao's avatar
dengyihao 已提交
212 213 214 215 216 217
  while (p != NULL) {
    while (!QUEUE_IS_EMPTY(&p->conn)) {
      queue*    h = QUEUE_HEAD(&p->conn);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
      if (c->expireTime < currentTime) {
        QUEUE_REMOVE(h);
218 219 220
        // uv_stream_t stm = *(c->stream);
        // uv_close((uv_handle_t*)&stm, clientDestroy);
        clientConnDestroy(c, true);
dengyihao's avatar
dengyihao 已提交
221 222 223 224
      } else {
        break;
      }
    }
dengyihao's avatar
dengyihao 已提交
225
    p = taosHashIterate((SHashObj*)pThrd->pool, p);
dengyihao's avatar
dengyihao 已提交
226 227
  }

dengyihao's avatar
dengyihao 已提交
228 229
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
  uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
230
}
231 232 233
static void* creatConnPool(int size) {
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
234
}
235
static void* destroyConnPool(void* pool) {
dengyihao's avatar
dengyihao 已提交
236
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
237 238 239 240 241
  while (connList != NULL) {
    while (!QUEUE_IS_EMPTY(&connList->conn)) {
      queue* h = QUEUE_HEAD(&connList->conn);
      QUEUE_REMOVE(h);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
242
      clientConnDestroy(c, true);
dengyihao's avatar
dengyihao 已提交
243
    }
dengyihao's avatar
dengyihao 已提交
244
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
245
  }
dengyihao's avatar
dengyihao 已提交
246
  taosHashClear(pool);
dengyihao's avatar
dengyihao 已提交
247 248
}

dengyihao's avatar
dengyihao 已提交
249
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
dengyihao's avatar
dengyihao 已提交
250 251 252 253
  char key[128] = {0};
  tstrncpy(key, ip, strlen(ip));
  tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));

dengyihao's avatar
dengyihao 已提交
254 255
  SHashObj*  pPool = pool;
  SConnList* plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
256 257
  if (plist == NULL) {
    SConnList list;
dengyihao's avatar
dengyihao 已提交
258 259
    taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
260
    QUEUE_INIT(&plist->conn);
dengyihao's avatar
dengyihao 已提交
261 262 263 264 265 266 267
  }

  if (QUEUE_IS_EMPTY(&plist->conn)) {
    return NULL;
  }
  queue* h = QUEUE_HEAD(&plist->conn);
  QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
268 269 270 271

  SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
  QUEUE_INIT(&conn->conn);
  return conn;
dengyihao's avatar
dengyihao 已提交
272
}
dengyihao's avatar
dengyihao 已提交
273
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
274
  char key[128] = {0};
dengyihao's avatar
dengyihao 已提交
275

dengyihao's avatar
dengyihao 已提交
276 277
  tstrncpy(key, ip, strlen(ip));
  tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
H
Haojun Liao 已提交
278
  tTrace("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
dengyihao's avatar
dengyihao 已提交
279

dengyihao's avatar
dengyihao 已提交
280
  SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
dengyihao's avatar
dengyihao 已提交
281

dengyihao's avatar
dengyihao 已提交
282
  conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
dengyihao's avatar
dengyihao 已提交
283
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
284
  conn->notifyCount = 0;
dengyihao's avatar
dengyihao 已提交
285 286 287 288
  // list already create before
  assert(plist != NULL);
  QUEUE_PUSH(&plist->conn, &conn->conn);
}
dengyihao's avatar
dengyihao 已提交
289 290 291 292 293 294 295 296 297
static bool clientReadComplete(SConnBuffer* data) {
  STransMsgHead head;
  int32_t       headLen = sizeof(head);
  if (data->len >= headLen) {
    memcpy((char*)&head, data->buf, headLen);
    int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
    if (msgLen > data->len) {
      data->left = msgLen - data->len;
      return false;
dengyihao's avatar
dengyihao 已提交
298 299
    } else if (msgLen == data->len) {
      data->left = 0;
dengyihao's avatar
dengyihao 已提交
300 301 302 303 304 305
      return true;
    }
  } else {
    return false;
  }
}
dengyihao's avatar
dengyihao 已提交
306
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
307 308
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
309
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
310 311
}
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
312
  // impl later
dengyihao's avatar
dengyihao 已提交
313 314
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
315
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
316 317
    pBuf->len += nread;
    if (clientReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
318
      uv_read_stop((uv_stream_t*)conn->stream);
H
Haojun Liao 已提交
319
      tTrace("client conn %p read complete", conn);
320
      clientHandleResp(conn);
dengyihao's avatar
dengyihao 已提交
321
    } else {
H
Haojun Liao 已提交
322
      tTrace("client conn %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
323
    }
dengyihao's avatar
dengyihao 已提交
324 325
    return;
  }
326 327
  assert(nread <= 0);
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
328 329 330
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
331 332
    return;
  }
dengyihao's avatar
dengyihao 已提交
333
  if (nread < 0 || nread == UV_EOF) {
H
Haojun Liao 已提交
334
    tError("client conn %p read error: %s", conn, uv_err_name(nread));
dengyihao's avatar
dengyihao 已提交
335
    clientHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
336
  }
337 338
  // tDebug("Read error %s\n", uv_err_name(nread));
  // uv_close((uv_handle_t*)handle, clientDestroy);
dengyihao's avatar
dengyihao 已提交
339
}
dengyihao's avatar
dengyihao 已提交
340

341
static void clientConnDestroy(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
342
  //
dengyihao's avatar
dengyihao 已提交
343 344
  conn->ref--;
  if (conn->ref == 0) {
H
Haojun Liao 已提交
345
    tTrace("client conn %p remove from conn pool", conn);
dengyihao's avatar
dengyihao 已提交
346 347 348 349
    QUEUE_REMOVE(&conn->conn);
    if (clear) {
      uv_close((uv_handle_t*)conn->stream, clientDestroy);
    }
350
  }
dengyihao's avatar
dengyihao 已提交
351
}
dengyihao's avatar
dengyihao 已提交
352 353
static void clientDestroy(uv_handle_t* handle) {
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
354
  // transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
355 356 357

  free(conn->stream);
  free(conn->writeReq);
H
Haojun Liao 已提交
358
  tTrace("client conn %p destroy successfully", conn);
dengyihao's avatar
dengyihao 已提交
359 360 361
  free(conn);

  // clientConnDestroy(conn, false);
dengyihao's avatar
dengyihao 已提交
362 363 364 365 366
}

static void clientWriteCb(uv_write_t* req, int status) {
  SCliConn* pConn = req->data;
  if (status == 0) {
H
Haojun Liao 已提交
367
    tTrace("client conn %p data already was written out", pConn);
dengyihao's avatar
dengyihao 已提交
368
    SCliMsg* pMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
369
    if (pMsg == NULL) {
H
Haojun Liao 已提交
370 371
      // handle
      return;
dengyihao's avatar
dengyihao 已提交
372
    }
dengyihao's avatar
dengyihao 已提交
373
    destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
374
  } else {
H
Haojun Liao 已提交
375
    tError("client conn %p failed to write: %s", pConn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
376
    clientHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
377 378
    return;
  }
dengyihao's avatar
dengyihao 已提交
379
  SCliThrdObj* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
380
  uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
dengyihao's avatar
dengyihao 已提交
381
}
dengyihao's avatar
dengyihao 已提交
382 383

static void clientWrite(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
384 385 386 387
  SCliMsg*       pCliMsg = pConn->data;
  SRpcMsg*       pMsg = (SRpcMsg*)(&pCliMsg->msg);
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);

dengyihao's avatar
dengyihao 已提交
388
  int msgLen = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
389

dengyihao's avatar
dengyihao 已提交
390 391 392 393
  pHead->msgType = pMsg->msgType;
  pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);

  uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
H
Haojun Liao 已提交
394 395
  tDebug("client conn %p %s is send to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr),
         ntohs(pConn->addr.sin_port));
dengyihao's avatar
dengyihao 已提交
396 397
  uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
}
dengyihao's avatar
dengyihao 已提交
398 399
static void clientConnCb(uv_connect_t* req, int status) {
  // impl later
dengyihao's avatar
dengyihao 已提交
400 401
  SCliConn* pConn = req->data;
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
402
    // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
H
Haojun Liao 已提交
403
    tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
404
    clientHandleExcept(pConn);
405
    return;
dengyihao's avatar
dengyihao 已提交
406
  }
H
Haojun Liao 已提交
407 408 409 410
  int addrlen = sizeof(pConn->addr);
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen);

  tTrace("client conn %p create", pConn);
dengyihao's avatar
dengyihao 已提交
411

dengyihao's avatar
dengyihao 已提交
412
  assert(pConn->stream == req->handle);
dengyihao's avatar
dengyihao 已提交
413
  clientWrite(pConn);
dengyihao's avatar
dengyihao 已提交
414 415
}

H
Haojun Liao 已提交
416 417 418 419 420 421 422 423 424
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
  tDebug("client work thread %p start to quit", pThrd);
  destroyCmsg(pMsg);
  // transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL);
  uv_timer_stop(pThrd->timer);
  pThrd->quit = true;
  // uv__async_stop(pThrd->cliAsync);
  uv_stop(pThrd->loop);
}
dengyihao's avatar
dengyihao 已提交
425
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
426 427
  uint64_t et = taosGetTimestampUs();
  uint64_t el = et - pMsg->st;
H
Haojun Liao 已提交
428
  tTrace("client msg tran time cost: %" PRIu64 "", el);
dengyihao's avatar
dengyihao 已提交
429
  et = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
430

H
Haojun Liao 已提交
431 432 433 434
  // if (pMsg->msg.handle != NULL) {
  //  // handle
  //}

dengyihao's avatar
dengyihao 已提交
435
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
436
  SCliConn*      conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
dengyihao's avatar
dengyihao 已提交
437
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
438
    // impl later
H
Haojun Liao 已提交
439
    tTrace("client get conn %p from pool", conn);
dengyihao's avatar
dengyihao 已提交
440 441
    conn->data = pMsg;
    conn->writeReq->data = conn;
dengyihao's avatar
dengyihao 已提交
442
    transDestroyBuffer(&conn->readBuf);
H
Haojun Liao 已提交
443 444 445 446 447

    if (pThrd->quit) {
      clientHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
448
    clientWrite(conn);
H
Haojun Liao 已提交
449 450 451

    conn->push = pMsg->msg.push;

dengyihao's avatar
dengyihao 已提交
452
  } else {
dengyihao's avatar
dengyihao 已提交
453
    SCliConn* conn = calloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
454
    conn->ref++;
dengyihao's avatar
dengyihao 已提交
455
    // read/write stream handle
dengyihao's avatar
dengyihao 已提交
456 457
    conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
    uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
dengyihao's avatar
dengyihao 已提交
458 459 460
    conn->stream->data = conn;

    // write req handle
dengyihao's avatar
dengyihao 已提交
461
    conn->writeReq = malloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
462
    conn->writeReq->data = conn;
dengyihao's avatar
dengyihao 已提交
463

dengyihao's avatar
dengyihao 已提交
464
    QUEUE_INIT(&conn->conn);
dengyihao's avatar
dengyihao 已提交
465 466 467

    conn->connReq.data = conn;
    conn->data = pMsg;
dengyihao's avatar
dengyihao 已提交
468
    conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
469

H
Haojun Liao 已提交
470 471
    conn->push = pMsg->msg.push;

dengyihao's avatar
dengyihao 已提交
472
    struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
473
    uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
dengyihao's avatar
dengyihao 已提交
474
    // handle error in callback if fail to connect
dengyihao's avatar
dengyihao 已提交
475
    uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
dengyihao's avatar
dengyihao 已提交
476 477 478
  }
}
static void clientAsyncCb(uv_async_t* handle) {
H
Haojun Liao 已提交
479 480
  SAsyncItem*  item = handle->data;
  SCliThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
481 482
  SCliMsg*     pMsg = NULL;
  queue        wq;
dengyihao's avatar
dengyihao 已提交
483

dengyihao's avatar
dengyihao 已提交
484
  // batch process to avoid to lock/unlock frequently
H
Haojun Liao 已提交
485 486 487
  pthread_mutex_lock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  pthread_mutex_unlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
488

dengyihao's avatar
dengyihao 已提交
489 490 491 492
  int count = 0;
  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
493 494

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
H
Haojun Liao 已提交
495 496 497 498 499 500
    if (pMsg->ctx == NULL) {
      clientHandleQuit(pMsg, pThrd);
    } else {
      clientHandleReq(pMsg, pThrd);
    }
    // clientHandleReq(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
501
    count++;
dengyihao's avatar
dengyihao 已提交
502 503
  }
  if (count >= 2) {
H
Haojun Liao 已提交
504
    tTrace("client process batch size: %d", count);
dengyihao's avatar
dengyihao 已提交
505
  }
dengyihao's avatar
dengyihao 已提交
506 507 508 509
}

static void* clientThread(void* arg) {
  SCliThrdObj* pThrd = (SCliThrdObj*)arg;
H
Haojun Liao 已提交
510
  setThreadName("trans-client-work");
dengyihao's avatar
dengyihao 已提交
511 512 513 514 515
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
}

void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
  SClientObj* cli = calloc(1, sizeof(SClientObj));
dengyihao's avatar
dengyihao 已提交
516

dengyihao's avatar
dengyihao 已提交
517
  SRpcInfo* pRpc = shandle;
dengyihao's avatar
dengyihao 已提交
518 519 520 521 522
  memcpy(cli->label, label, strlen(label));
  cli->numOfThreads = numOfThreads;
  cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
523
    SCliThrdObj* pThrd = createThrdObj();
dengyihao's avatar
dengyihao 已提交
524
    pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
dengyihao's avatar
dengyihao 已提交
525
    pThrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
526

dengyihao's avatar
dengyihao 已提交
527
    int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
528
    if (err == 0) {
H
Haojun Liao 已提交
529
      tDebug("success to create tranport-client thread %d", i);
dengyihao's avatar
dengyihao 已提交
530
    }
dengyihao's avatar
dengyihao 已提交
531
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
532 533 534
  }
  return cli;
}
dengyihao's avatar
dengyihao 已提交
535 536 537 538 539 540 541 542 543 544 545 546 547 548

static void destroyUserdata(SRpcMsg* userdata) {
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
static void destroyCmsg(SCliMsg* pMsg) {
  if (pMsg == NULL) {
    return;
  }
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
549 550
  free(pMsg);
}
H
Haojun Liao 已提交
551

dengyihao's avatar
dengyihao 已提交
552 553 554 555 556 557 558 559
static SCliThrdObj* createThrdObj() {
  SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);

  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  uv_loop_init(pThrd->loop);

H
Haojun Liao 已提交
560
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, clientAsyncCb);
dengyihao's avatar
dengyihao 已提交
561

H
Haojun Liao 已提交
562 563 564
  pThrd->timer = malloc(sizeof(uv_timer_t));
  uv_timer_init(pThrd->loop, pThrd->timer);
  pThrd->timer->data = pThrd;
dengyihao's avatar
dengyihao 已提交
565

H
Haojun Liao 已提交
566 567 568
  pThrd->pool = creatConnPool(4);

  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
569 570
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
571
static void destroyThrdObj(SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
572 573 574
  if (pThrd == NULL) {
    return;
  }
H
Haojun Liao 已提交
575
  uv_stop(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
576 577
  pthread_join(pThrd->thread, NULL);
  pthread_mutex_destroy(&pThrd->msgMtx);
H
Haojun Liao 已提交
578 579 580
  transDestroyAsyncPool(pThrd->asyncPool);
  // free(pThrd->cliAsync);
  free(pThrd->timer);
dengyihao's avatar
dengyihao 已提交
581 582 583
  free(pThrd->loop);
  free(pThrd);
}
dengyihao's avatar
dengyihao 已提交
584

dengyihao's avatar
dengyihao 已提交
585
static void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
586 587 588 589 590
  if (ctx != NULL) {
    free(ctx->ip);
  }
  free(ctx);
}
dengyihao's avatar
dengyihao 已提交
591
//
H
Haojun Liao 已提交
592 593 594 595 596 597 598 599 600 601 602 603
static void clientSendQuit(SCliThrdObj* thrd) {
  // cli can stop gracefully
  SCliMsg* msg = calloc(1, sizeof(SCliMsg));
  msg->ctx = NULL;  //

  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &msg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);

  transSendAsync(thrd->asyncPool, &msg->q);
  // uv_async_send(thrd->cliAsync);
}
dengyihao's avatar
dengyihao 已提交
604 605 606
void taosCloseClient(void* arg) {
  // impl later
  SClientObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
607
  for (int i = 0; i < cli->numOfThreads; i++) {
H
Haojun Liao 已提交
608
    clientSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
609
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
610 611 612
  }
  free(cli->pThreadObj);
  free(cli);
dengyihao's avatar
dengyihao 已提交
613
}
dengyihao's avatar
dengyihao 已提交
614 615
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
  // impl later
H
Haojun Liao 已提交
616 617
  char*    ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
  uint32_t port = pEpSet->eps[pEpSet->inUse].port;
dengyihao's avatar
dengyihao 已提交
618

dengyihao's avatar
dengyihao 已提交
619 620
  SRpcInfo* pRpc = (SRpcInfo*)shandle;

dengyihao's avatar
dengyihao 已提交
621 622 623 624
  int32_t flen = 0;
  if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
    // imp later
  }
dengyihao's avatar
dengyihao 已提交
625

dengyihao's avatar
dengyihao 已提交
626 627
  STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));

dengyihao's avatar
dengyihao 已提交
628
  pCtx->pTransInst = (SRpcInfo*)shandle;
dengyihao's avatar
dengyihao 已提交
629 630 631 632
  pCtx->ahandle = pMsg->ahandle;
  pCtx->msgType = pMsg->msgType;
  pCtx->ip = strdup(ip);
  pCtx->port = port;
dengyihao's avatar
dengyihao 已提交
633 634 635 636 637 638 639

  assert(pRpc->connType == TAOS_CONN_CLIENT);
  // atomic or not
  int64_t index = pRpc->index;
  if (pRpc->index++ >= pRpc->numOfThreads) {
    pRpc->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
640 641 642 643
  SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pMsg;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
644 645 646

  SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];

H
Haojun Liao 已提交
647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694
  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);

  // int start = taosGetTimestampUs();
  transSendAsync(thrd->asyncPool, &(cliMsg->q));
  // uv_async_send(thrd->cliAsync);
  // int end = taosGetTimestampUs() - start;
  // tError("client sent to rpc, time cost: %d", (int)end);
}
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
  char*    ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
  uint32_t port = pEpSet->eps[pEpSet->inUse].port;

  SRpcInfo* pRpc = (SRpcInfo*)shandle;

  STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
  pCtx->pTransInst = (SRpcInfo*)shandle;
  pCtx->ahandle = pReq->ahandle;
  pCtx->msgType = pReq->msgType;
  pCtx->ip = strdup(ip);
  pCtx->port = port;
  pCtx->pSem = calloc(1, sizeof(tsem_t));
  pCtx->pRsp = pRsp;
  tsem_init(pCtx->pSem, 0, 0);

  int64_t index = pRpc->index;
  if (pRpc->index++ >= pRpc->numOfThreads) {
    pRpc->index = 0;
  }
  SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();

  SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];

  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);

  // int start = taosGetTimestampUs();
  transSendAsync(thrd->asyncPool, &(cliMsg->q));

  tsem_t* pSem = pCtx->pSem;
  tsem_wait(pSem);
  tsem_destroy(pSem);
  free(pSem);
dengyihao's avatar
dengyihao 已提交
695

H
Haojun Liao 已提交
696
  return;
dengyihao's avatar
dengyihao 已提交
697 698
}
#endif