transCli.c 20.6 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

#include "transComm.h"

20
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
dengyihao's avatar
dengyihao 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23 24
typedef struct SCliConn {
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
25
  uv_write_t*  writeReq;
dengyihao's avatar
dengyihao 已提交
26
  void*        hostThrd;
dengyihao's avatar
dengyihao 已提交
27
  SConnBuffer  readBuf;
dengyihao's avatar
dengyihao 已提交
28 29
  void*        data;
  queue        conn;
dengyihao's avatar
dengyihao 已提交
30
  uint64_t     expireTime;
dengyihao's avatar
dengyihao 已提交
31
  int8_t       notifyCount;  // timers already notify to client
dengyihao's avatar
dengyihao 已提交
32

dengyihao's avatar
dengyihao 已提交
33 34 35 36 37 38 39
  SRpcPush* push;
  int       persist;  //
  // spi configure
  char    spi;
  char    secured;
  int32_t ref;
  // debug and log info
dengyihao's avatar
dengyihao 已提交
40
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
41
} SCliConn;
dengyihao's avatar
dengyihao 已提交
42

dengyihao's avatar
dengyihao 已提交
43
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
44 45 46 47
  STransConnCtx* ctx;
  SRpcMsg        msg;
  queue          q;
  uint64_t       st;
dengyihao's avatar
dengyihao 已提交
48 49 50
} SCliMsg;

typedef struct SCliThrdObj {
dengyihao's avatar
dengyihao 已提交
51 52 53 54
  pthread_t  thread;
  uv_loop_t* loop;
  // uv_async_t*     cliAsync;  //
  SAsyncPool*     asyncPool;
dengyihao's avatar
dengyihao 已提交
55
  uv_timer_t*     timer;
dengyihao's avatar
dengyihao 已提交
56
  void*           pool;  // conn pool
dengyihao's avatar
dengyihao 已提交
57 58
  queue           msg;
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
59
  uint64_t        nextTimeout;  // next timeout
dengyihao's avatar
dengyihao 已提交
60
  void*           pTransInst;   //
dengyihao's avatar
dengyihao 已提交
61
  bool            quit;
dengyihao's avatar
dengyihao 已提交
62 63 64 65 66 67 68 69 70
} SCliThrdObj;

typedef struct SClientObj {
  char          label[TSDB_LABEL_LEN];
  int32_t       index;
  int           numOfThreads;
  SCliThrdObj** pThreadObj;
} SClientObj;

dengyihao's avatar
dengyihao 已提交
71 72 73 74
typedef struct SConnList {
  queue conn;
} SConnList;

dengyihao's avatar
dengyihao 已提交
75
// conn pool
dengyihao's avatar
dengyihao 已提交
76
// add expire timeout and capacity limit
77 78
static void*     creatConnPool(int size);
static void*     destroyConnPool(void* pool);
dengyihao's avatar
dengyihao 已提交
79 80
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void      addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
81

dengyihao's avatar
dengyihao 已提交
82 83
// register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
84 85 86
// check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf);
// alloc buf for read
dengyihao's avatar
dengyihao 已提交
87
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
88
// callback after read nbytes from socket
dengyihao's avatar
dengyihao 已提交
89
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
90
// callback after write data to socket
dengyihao's avatar
dengyihao 已提交
91
static void clientWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
92
// callback after conn  to server
dengyihao's avatar
dengyihao 已提交
93
static void clientConnCb(uv_connect_t* req, int status);
dengyihao's avatar
dengyihao 已提交
94
static void clientAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
95
static void clientDestroy(uv_handle_t* handle);
96
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
dengyihao's avatar
dengyihao 已提交
97

dengyihao's avatar
dengyihao 已提交
98 99 100 101
// process data read from server, auth/decompress etc later
static void clientHandleResp(SCliConn* conn);
// handle except about conn
static void clientHandleExcept(SCliConn* conn);
102 103
// handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
dengyihao's avatar
dengyihao 已提交
104 105
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientSendQuit(SCliThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
106

dengyihao's avatar
dengyihao 已提交
107 108 109 110
static void destroyUserdata(SRpcMsg* userdata);

static void destroyCmsg(SCliMsg* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
111 112 113 114
// thread obj
static SCliThrdObj* createThrdObj();
static void         destroyThrdObj(SCliThrdObj* pThrd);
// thread
dengyihao's avatar
dengyihao 已提交
115 116
static void* clientThread(void* arg);

117
static void clientHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
118 119
  SCliMsg*       pMsg = conn->data;
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
120
  SRpcInfo*      pRpc = pCtx->pTransInst;
dengyihao's avatar
dengyihao 已提交
121

dengyihao's avatar
dengyihao 已提交
122 123 124 125
  STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);

dengyihao's avatar
dengyihao 已提交
126
  SRpcMsg rpcMsg;
dengyihao's avatar
dengyihao 已提交
127
  rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
128
  rpcMsg.pCont = transContFromHead((char*)pHead);
dengyihao's avatar
dengyihao 已提交
129 130
  rpcMsg.code = pHead->code;
  rpcMsg.msgType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
131
  rpcMsg.ahandle = pCtx->ahandle;
dengyihao's avatar
dengyihao 已提交
132

dengyihao's avatar
dengyihao 已提交
133 134
  tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr),
         ntohs(conn->addr.sin_port));
dengyihao's avatar
dengyihao 已提交
135 136 137

  if (conn->push != NULL) {
    (*conn->push->callback)(conn->push->arg, &rpcMsg);
dengyihao's avatar
dengyihao 已提交
138
  } else {
dengyihao's avatar
dengyihao 已提交
139 140 141 142 143 144 145 146
    if (pCtx->pSem == NULL) {
      tTrace("client conn(sync) %p handle resp", conn);
      (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
    } else {
      tTrace("client conn(sync) %p handle resp", conn);
      memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
      tsem_post(pCtx->pSem);
    }
dengyihao's avatar
dengyihao 已提交
147
  }
dengyihao's avatar
dengyihao 已提交
148
  conn->notifyCount += 1;
dengyihao's avatar
dengyihao 已提交
149

dengyihao's avatar
dengyihao 已提交
150
  // buf's mem alread translated to rpcMsg.pCont
dengyihao's avatar
dengyihao 已提交
151 152
  transClearBuffer(&conn->readBuf);

dengyihao's avatar
dengyihao 已提交
153
  uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
dengyihao's avatar
dengyihao 已提交
154 155

  SCliThrdObj* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
156 157 158 159
  // user owns conn->persist = 1
  if (conn->push != NULL) {
    addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
  }
dengyihao's avatar
dengyihao 已提交
160

dengyihao's avatar
dengyihao 已提交
161 162
  destroyCmsg(pMsg);
  conn->data = NULL;
dengyihao's avatar
dengyihao 已提交
163
  // start thread's timer of conn pool if not active
dengyihao's avatar
dengyihao 已提交
164 165
  if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
    uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
166
  }
dengyihao's avatar
dengyihao 已提交
167 168
}
static void clientHandleExcept(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
169
  if (pConn->data == NULL && pConn->push == NULL) {
dengyihao's avatar
dengyihao 已提交
170
    // handle conn except in conn pool
dengyihao's avatar
dengyihao 已提交
171 172 173
    clientConnDestroy(pConn, true);
    return;
  }
dengyihao's avatar
dengyihao 已提交
174
  tTrace("client conn %p start to destroy", pConn);
dengyihao's avatar
dengyihao 已提交
175
  SCliMsg* pMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
176 177

  destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
178 179 180

  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
181
  SRpcMsg rpcMsg = {0};
dengyihao's avatar
dengyihao 已提交
182
  rpcMsg.ahandle = pCtx->ahandle;
dengyihao's avatar
dengyihao 已提交
183
  rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
dengyihao's avatar
dengyihao 已提交
184 185 186

  if (pConn->push != NULL) {
    (*pConn->push->callback)(pConn->push->arg, &rpcMsg);
dengyihao's avatar
dengyihao 已提交
187
  } else {
dengyihao's avatar
dengyihao 已提交
188 189 190 191 192 193 194
    if (pCtx->pSem == NULL) {
      (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
    } else {
      memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
      // SRpcMsg rpcMsg
      tsem_post(pCtx->pSem);
    }
dengyihao's avatar
dengyihao 已提交
195
  }
dengyihao's avatar
dengyihao 已提交
196 197 198 199

  destroyCmsg(pMsg);
  pConn->data = NULL;
  // transDestroyConnCtx(pCtx);
dengyihao's avatar
dengyihao 已提交
200
  clientConnDestroy(pConn, true);
dengyihao's avatar
dengyihao 已提交
201
  pConn->notifyCount += 1;
dengyihao's avatar
dengyihao 已提交
202
}
dengyihao's avatar
dengyihao 已提交
203

dengyihao's avatar
dengyihao 已提交
204 205
static void clientTimeoutCb(uv_timer_t* handle) {
  SCliThrdObj* pThrd = handle->data;
dengyihao's avatar
dengyihao 已提交
206
  SRpcInfo*    pRpc = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
207
  int64_t      currentTime = pThrd->nextTimeout;
dengyihao's avatar
dengyihao 已提交
208
  tTrace("client conn timeout, try to remove expire conn from conn pool");
dengyihao's avatar
dengyihao 已提交
209

dengyihao's avatar
dengyihao 已提交
210
  SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
dengyihao's avatar
dengyihao 已提交
211 212 213 214 215 216
  while (p != NULL) {
    while (!QUEUE_IS_EMPTY(&p->conn)) {
      queue*    h = QUEUE_HEAD(&p->conn);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
      if (c->expireTime < currentTime) {
        QUEUE_REMOVE(h);
217 218 219
        // uv_stream_t stm = *(c->stream);
        // uv_close((uv_handle_t*)&stm, clientDestroy);
        clientConnDestroy(c, true);
dengyihao's avatar
dengyihao 已提交
220 221 222 223
      } else {
        break;
      }
    }
dengyihao's avatar
dengyihao 已提交
224
    p = taosHashIterate((SHashObj*)pThrd->pool, p);
dengyihao's avatar
dengyihao 已提交
225 226
  }

dengyihao's avatar
dengyihao 已提交
227 228
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
  uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
229
}
230 231 232
static void* creatConnPool(int size) {
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
233
}
234
static void* destroyConnPool(void* pool) {
dengyihao's avatar
dengyihao 已提交
235
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
236 237 238 239 240
  while (connList != NULL) {
    while (!QUEUE_IS_EMPTY(&connList->conn)) {
      queue* h = QUEUE_HEAD(&connList->conn);
      QUEUE_REMOVE(h);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
241
      clientConnDestroy(c, true);
dengyihao's avatar
dengyihao 已提交
242
    }
dengyihao's avatar
dengyihao 已提交
243
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
244
  }
dengyihao's avatar
dengyihao 已提交
245
  taosHashClear(pool);
dengyihao's avatar
dengyihao 已提交
246 247
}

dengyihao's avatar
dengyihao 已提交
248
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
dengyihao's avatar
dengyihao 已提交
249 250 251 252
  char key[128] = {0};
  tstrncpy(key, ip, strlen(ip));
  tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));

dengyihao's avatar
dengyihao 已提交
253 254
  SHashObj*  pPool = pool;
  SConnList* plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
255 256
  if (plist == NULL) {
    SConnList list;
dengyihao's avatar
dengyihao 已提交
257 258
    taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
259
    QUEUE_INIT(&plist->conn);
dengyihao's avatar
dengyihao 已提交
260 261 262 263 264 265 266
  }

  if (QUEUE_IS_EMPTY(&plist->conn)) {
    return NULL;
  }
  queue* h = QUEUE_HEAD(&plist->conn);
  QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
267 268 269 270

  SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
  QUEUE_INIT(&conn->conn);
  return conn;
dengyihao's avatar
dengyihao 已提交
271
}
dengyihao's avatar
dengyihao 已提交
272
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
273
  char key[128] = {0};
dengyihao's avatar
dengyihao 已提交
274

dengyihao's avatar
dengyihao 已提交
275 276
  tstrncpy(key, ip, strlen(ip));
  tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
dengyihao's avatar
dengyihao 已提交
277
  tTrace("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
dengyihao's avatar
dengyihao 已提交
278

dengyihao's avatar
dengyihao 已提交
279
  SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
dengyihao's avatar
dengyihao 已提交
280

dengyihao's avatar
dengyihao 已提交
281
  conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
dengyihao's avatar
dengyihao 已提交
282
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
283
  conn->notifyCount = 0;
dengyihao's avatar
dengyihao 已提交
284 285 286 287
  // list already create before
  assert(plist != NULL);
  QUEUE_PUSH(&plist->conn, &conn->conn);
}
dengyihao's avatar
dengyihao 已提交
288 289 290 291 292 293 294 295 296
static bool clientReadComplete(SConnBuffer* data) {
  STransMsgHead head;
  int32_t       headLen = sizeof(head);
  if (data->len >= headLen) {
    memcpy((char*)&head, data->buf, headLen);
    int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
    if (msgLen > data->len) {
      data->left = msgLen - data->len;
      return false;
dengyihao's avatar
dengyihao 已提交
297 298
    } else if (msgLen == data->len) {
      data->left = 0;
dengyihao's avatar
dengyihao 已提交
299 300 301 302 303 304
      return true;
    }
  } else {
    return false;
  }
}
dengyihao's avatar
dengyihao 已提交
305
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
306 307
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
308
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
309 310
}
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
311
  // impl later
dengyihao's avatar
dengyihao 已提交
312 313
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
314
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
315 316
    pBuf->len += nread;
    if (clientReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
317
      uv_read_stop((uv_stream_t*)conn->stream);
dengyihao's avatar
dengyihao 已提交
318
      tTrace("client conn %p read complete", conn);
319
      clientHandleResp(conn);
dengyihao's avatar
dengyihao 已提交
320
    } else {
dengyihao's avatar
dengyihao 已提交
321
      tTrace("client conn %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
322
    }
dengyihao's avatar
dengyihao 已提交
323 324
    return;
  }
325 326
  assert(nread <= 0);
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
327 328 329
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
330 331
    return;
  }
dengyihao's avatar
dengyihao 已提交
332
  if (nread < 0 || nread == UV_EOF) {
dengyihao's avatar
dengyihao 已提交
333
    tError("client conn %p read error: %s", conn, uv_err_name(nread));
dengyihao's avatar
dengyihao 已提交
334
    clientHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
335
  }
336 337
  // tDebug("Read error %s\n", uv_err_name(nread));
  // uv_close((uv_handle_t*)handle, clientDestroy);
dengyihao's avatar
dengyihao 已提交
338
}
dengyihao's avatar
dengyihao 已提交
339

340
static void clientConnDestroy(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
341
  //
dengyihao's avatar
dengyihao 已提交
342 343
  conn->ref--;
  if (conn->ref == 0) {
dengyihao's avatar
dengyihao 已提交
344
    tTrace("client conn %p remove from conn pool", conn);
dengyihao's avatar
dengyihao 已提交
345 346 347 348
    QUEUE_REMOVE(&conn->conn);
    if (clear) {
      uv_close((uv_handle_t*)conn->stream, clientDestroy);
    }
349
  }
dengyihao's avatar
dengyihao 已提交
350
}
dengyihao's avatar
dengyihao 已提交
351 352
static void clientDestroy(uv_handle_t* handle) {
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
353
  // transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
354 355 356

  free(conn->stream);
  free(conn->writeReq);
dengyihao's avatar
dengyihao 已提交
357
  tTrace("client conn %p destroy successfully", conn);
dengyihao's avatar
dengyihao 已提交
358 359 360
  free(conn);

  // clientConnDestroy(conn, false);
dengyihao's avatar
dengyihao 已提交
361 362 363 364 365
}

static void clientWriteCb(uv_write_t* req, int status) {
  SCliConn* pConn = req->data;
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
366
    tTrace("client conn %p data already was written out", pConn);
dengyihao's avatar
dengyihao 已提交
367
    SCliMsg* pMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
368
    if (pMsg == NULL) {
dengyihao's avatar
dengyihao 已提交
369 370
      // handle
      return;
dengyihao's avatar
dengyihao 已提交
371
    }
dengyihao's avatar
dengyihao 已提交
372
    destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
373
  } else {
dengyihao's avatar
dengyihao 已提交
374
    tError("client conn %p failed to write: %s", pConn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
375
    clientHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
376 377
    return;
  }
dengyihao's avatar
dengyihao 已提交
378
  SCliThrdObj* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
379
  uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
dengyihao's avatar
dengyihao 已提交
380
}
dengyihao's avatar
dengyihao 已提交
381 382

static void clientWrite(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
383 384 385 386
  SCliMsg*       pCliMsg = pConn->data;
  SRpcMsg*       pMsg = (SRpcMsg*)(&pCliMsg->msg);
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);

dengyihao's avatar
dengyihao 已提交
387
  int msgLen = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
388

dengyihao's avatar
dengyihao 已提交
389 390 391 392
  pHead->msgType = pMsg->msgType;
  pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);

  uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
393 394
  tDebug("client conn %p %s is send to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr),
         ntohs(pConn->addr.sin_port));
dengyihao's avatar
dengyihao 已提交
395 396
  uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
}
dengyihao's avatar
dengyihao 已提交
397 398
static void clientConnCb(uv_connect_t* req, int status) {
  // impl later
dengyihao's avatar
dengyihao 已提交
399 400
  SCliConn* pConn = req->data;
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
401
    // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
402
    tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
403
    clientHandleExcept(pConn);
404
    return;
dengyihao's avatar
dengyihao 已提交
405
  }
dengyihao's avatar
dengyihao 已提交
406 407 408 409
  int addrlen = sizeof(pConn->addr);
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen);

  tTrace("client conn %p create", pConn);
dengyihao's avatar
dengyihao 已提交
410

dengyihao's avatar
dengyihao 已提交
411
  assert(pConn->stream == req->handle);
dengyihao's avatar
dengyihao 已提交
412
  clientWrite(pConn);
dengyihao's avatar
dengyihao 已提交
413 414
}

dengyihao's avatar
dengyihao 已提交
415
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
416
  tDebug("client work thread %p start to quit", pThrd);
dengyihao's avatar
dengyihao 已提交
417
  destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
418
  // transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL);
dengyihao's avatar
dengyihao 已提交
419 420 421 422 423
  uv_timer_stop(pThrd->timer);
  pThrd->quit = true;
  // uv__async_stop(pThrd->cliAsync);
  uv_stop(pThrd->loop);
}
dengyihao's avatar
dengyihao 已提交
424
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
425 426
  uint64_t et = taosGetTimestampUs();
  uint64_t el = et - pMsg->st;
dengyihao's avatar
dengyihao 已提交
427
  tTrace("client msg tran time cost: %" PRIu64 "", el);
dengyihao's avatar
dengyihao 已提交
428
  et = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
429

dengyihao's avatar
dengyihao 已提交
430 431 432 433
  // if (pMsg->msg.handle != NULL) {
  //  // handle
  //}

dengyihao's avatar
dengyihao 已提交
434
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
435
  SCliConn*      conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
dengyihao's avatar
dengyihao 已提交
436
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
437
    // impl later
dengyihao's avatar
dengyihao 已提交
438
    tTrace("client get conn %p from pool", conn);
dengyihao's avatar
dengyihao 已提交
439 440
    conn->data = pMsg;
    conn->writeReq->data = conn;
dengyihao's avatar
dengyihao 已提交
441
    transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
442 443 444 445 446

    if (pThrd->quit) {
      clientHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
447
    clientWrite(conn);
dengyihao's avatar
dengyihao 已提交
448

dengyihao's avatar
dengyihao 已提交
449 450
    conn->push = pMsg->msg.push;

dengyihao's avatar
dengyihao 已提交
451
  } else {
dengyihao's avatar
dengyihao 已提交
452
    SCliConn* conn = calloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
453
    conn->ref++;
dengyihao's avatar
dengyihao 已提交
454
    // read/write stream handle
dengyihao's avatar
dengyihao 已提交
455 456
    conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
    uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
dengyihao's avatar
dengyihao 已提交
457 458 459
    conn->stream->data = conn;

    // write req handle
dengyihao's avatar
dengyihao 已提交
460
    conn->writeReq = malloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
461
    conn->writeReq->data = conn;
dengyihao's avatar
dengyihao 已提交
462

dengyihao's avatar
dengyihao 已提交
463
    QUEUE_INIT(&conn->conn);
dengyihao's avatar
dengyihao 已提交
464 465 466

    conn->connReq.data = conn;
    conn->data = pMsg;
dengyihao's avatar
dengyihao 已提交
467
    conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
468

dengyihao's avatar
dengyihao 已提交
469 470
    conn->push = pMsg->msg.push;

dengyihao's avatar
dengyihao 已提交
471
    struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
472
    uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
dengyihao's avatar
dengyihao 已提交
473
    // handle error in callback if fail to connect
dengyihao's avatar
dengyihao 已提交
474
    uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
dengyihao's avatar
dengyihao 已提交
475 476 477
  }
}
static void clientAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
478 479
  SAsyncItem*  item = handle->data;
  SCliThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
480 481
  SCliMsg*     pMsg = NULL;
  queue        wq;
dengyihao's avatar
dengyihao 已提交
482

dengyihao's avatar
dengyihao 已提交
483
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
dengyihao 已提交
484 485 486
  pthread_mutex_lock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  pthread_mutex_unlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
487

dengyihao's avatar
dengyihao 已提交
488 489 490 491
  int count = 0;
  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
492 493

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
494 495 496 497 498 499
    if (pMsg->ctx == NULL) {
      clientHandleQuit(pMsg, pThrd);
    } else {
      clientHandleReq(pMsg, pThrd);
    }
    // clientHandleReq(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
500
    count++;
dengyihao's avatar
dengyihao 已提交
501 502
  }
  if (count >= 2) {
dengyihao's avatar
dengyihao 已提交
503
    tTrace("client process batch size: %d", count);
dengyihao's avatar
dengyihao 已提交
504
  }
dengyihao's avatar
dengyihao 已提交
505 506 507 508
}

static void* clientThread(void* arg) {
  SCliThrdObj* pThrd = (SCliThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
509
  setThreadName("trans-client-work");
dengyihao's avatar
dengyihao 已提交
510 511 512 513 514
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
}

void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
  SClientObj* cli = calloc(1, sizeof(SClientObj));
dengyihao's avatar
dengyihao 已提交
515

dengyihao's avatar
dengyihao 已提交
516
  SRpcInfo* pRpc = shandle;
dengyihao's avatar
dengyihao 已提交
517 518 519 520 521
  memcpy(cli->label, label, strlen(label));
  cli->numOfThreads = numOfThreads;
  cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
522
    SCliThrdObj* pThrd = createThrdObj();
dengyihao's avatar
dengyihao 已提交
523
    pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
dengyihao's avatar
dengyihao 已提交
524
    pThrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
525

dengyihao's avatar
dengyihao 已提交
526
    int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
527
    if (err == 0) {
dengyihao's avatar
dengyihao 已提交
528
      tDebug("success to create tranport-client thread %d", i);
dengyihao's avatar
dengyihao 已提交
529
    }
dengyihao's avatar
dengyihao 已提交
530
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
531 532 533
  }
  return cli;
}
dengyihao's avatar
dengyihao 已提交
534 535 536 537 538 539 540 541 542 543 544 545 546 547

static void destroyUserdata(SRpcMsg* userdata) {
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
static void destroyCmsg(SCliMsg* pMsg) {
  if (pMsg == NULL) {
    return;
  }
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
548 549
  free(pMsg);
}
dengyihao's avatar
dengyihao 已提交
550

dengyihao's avatar
dengyihao 已提交
551 552 553 554 555 556 557 558
static SCliThrdObj* createThrdObj() {
  SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);

  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  uv_loop_init(pThrd->loop);

dengyihao's avatar
dengyihao 已提交
559
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, clientAsyncCb);
dengyihao's avatar
dengyihao 已提交
560

dengyihao's avatar
dengyihao 已提交
561 562 563
  pThrd->timer = malloc(sizeof(uv_timer_t));
  uv_timer_init(pThrd->loop, pThrd->timer);
  pThrd->timer->data = pThrd;
dengyihao's avatar
dengyihao 已提交
564

dengyihao's avatar
dengyihao 已提交
565
  pThrd->pool = creatConnPool(4);
dengyihao's avatar
dengyihao 已提交
566 567

  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
568 569
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
570
static void destroyThrdObj(SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
571 572 573
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
574
  uv_stop(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
575 576
  pthread_join(pThrd->thread, NULL);
  pthread_mutex_destroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
577 578
  transDestroyAsyncPool(pThrd->asyncPool);
  // free(pThrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
579
  free(pThrd->timer);
dengyihao's avatar
dengyihao 已提交
580 581 582
  free(pThrd->loop);
  free(pThrd);
}
dengyihao's avatar
dengyihao 已提交
583

dengyihao's avatar
dengyihao 已提交
584
static void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
585 586 587 588 589
  if (ctx != NULL) {
    free(ctx->ip);
  }
  free(ctx);
}
dengyihao's avatar
dengyihao 已提交
590
//
dengyihao's avatar
dengyihao 已提交
591 592 593 594 595
static void clientSendQuit(SCliThrdObj* thrd) {
  // cli can stop gracefully
  SCliMsg* msg = calloc(1, sizeof(SCliMsg));
  msg->ctx = NULL;  //

dengyihao's avatar
dengyihao 已提交
596 597 598
  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &msg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
599

dengyihao's avatar
dengyihao 已提交
600
  transSendAsync(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
601
  // uv_async_send(thrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
602
}
dengyihao's avatar
dengyihao 已提交
603 604 605
void taosCloseClient(void* arg) {
  // impl later
  SClientObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
606
  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
607
    clientSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
608
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
609 610 611
  }
  free(cli->pThreadObj);
  free(cli);
dengyihao's avatar
dengyihao 已提交
612
}
dengyihao's avatar
dengyihao 已提交
613 614
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
  // impl later
dengyihao's avatar
dengyihao 已提交
615 616
  char*    ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
  uint32_t port = pEpSet->eps[pEpSet->inUse].port;
dengyihao's avatar
dengyihao 已提交
617

dengyihao's avatar
dengyihao 已提交
618 619
  SRpcInfo* pRpc = (SRpcInfo*)shandle;

dengyihao's avatar
dengyihao 已提交
620 621 622 623
  int32_t flen = 0;
  if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
    // imp later
  }
dengyihao's avatar
dengyihao 已提交
624

dengyihao's avatar
dengyihao 已提交
625 626
  STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));

dengyihao's avatar
dengyihao 已提交
627
  pCtx->pTransInst = (SRpcInfo*)shandle;
dengyihao's avatar
dengyihao 已提交
628 629 630 631
  pCtx->ahandle = pMsg->ahandle;
  pCtx->msgType = pMsg->msgType;
  pCtx->ip = strdup(ip);
  pCtx->port = port;
dengyihao's avatar
dengyihao 已提交
632 633 634 635 636 637 638

  assert(pRpc->connType == TAOS_CONN_CLIENT);
  // atomic or not
  int64_t index = pRpc->index;
  if (pRpc->index++ >= pRpc->numOfThreads) {
    pRpc->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
639 640 641 642
  SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pMsg;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
643 644 645

  SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];

dengyihao's avatar
dengyihao 已提交
646 647 648
  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
649

dengyihao's avatar
dengyihao 已提交
650 651
  // int start = taosGetTimestampUs();
  transSendAsync(thrd->asyncPool, &(cliMsg->q));
dengyihao's avatar
dengyihao 已提交
652
  // uv_async_send(thrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
653
  // int end = taosGetTimestampUs() - start;
dengyihao's avatar
dengyihao 已提交
654
  // tError("client sent to rpc, time cost: %d", (int)end);
dengyihao's avatar
dengyihao 已提交
655
}
dengyihao's avatar
dengyihao 已提交
656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
  char*    ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
  uint32_t port = pEpSet->eps[pEpSet->inUse].port;

  SRpcInfo* pRpc = (SRpcInfo*)shandle;

  STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
  pCtx->pTransInst = (SRpcInfo*)shandle;
  pCtx->ahandle = pReq->ahandle;
  pCtx->msgType = pReq->msgType;
  pCtx->ip = strdup(ip);
  pCtx->port = port;
  pCtx->pSem = calloc(1, sizeof(tsem_t));
  pCtx->pRsp = pRsp;
  tsem_init(pCtx->pSem, 0, 0);

  int64_t index = pRpc->index;
  if (pRpc->index++ >= pRpc->numOfThreads) {
    pRpc->index = 0;
  }
  SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();

  SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];

  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);

  // int start = taosGetTimestampUs();
  transSendAsync(thrd->asyncPool, &(cliMsg->q));

  tsem_t* pSem = pCtx->pSem;
  tsem_wait(pSem);
  tsem_destroy(pSem);
  free(pSem);

  return;
}
dengyihao's avatar
dengyihao 已提交
697
#endif