transCli.c 21.3 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

#include "transComm.h"

20
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
dengyihao's avatar
dengyihao 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23 24
typedef struct SCliConn {
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
25
  uv_write_t*  writeReq;
dengyihao's avatar
dengyihao 已提交
26
  void*        hostThrd;
dengyihao's avatar
dengyihao 已提交
27
  SConnBuffer  readBuf;
dengyihao's avatar
dengyihao 已提交
28 29
  void*        data;
  queue        conn;
dengyihao's avatar
dengyihao 已提交
30
  uint64_t     expireTime;
dengyihao's avatar
dengyihao 已提交
31
  int8_t       notifyCount;  // timers already notify to client
dengyihao's avatar
dengyihao 已提交
32

dengyihao's avatar
dengyihao 已提交
33 34 35 36 37 38 39
  SRpcPush* push;
  int       persist;  //
  // spi configure
  char    spi;
  char    secured;
  int32_t ref;
  // debug and log info
dengyihao's avatar
dengyihao 已提交
40
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
41
} SCliConn;
dengyihao's avatar
dengyihao 已提交
42

dengyihao's avatar
dengyihao 已提交
43
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
44 45 46 47
  STransConnCtx* ctx;
  SRpcMsg        msg;
  queue          q;
  uint64_t       st;
dengyihao's avatar
dengyihao 已提交
48 49 50
} SCliMsg;

typedef struct SCliThrdObj {
dengyihao's avatar
dengyihao 已提交
51 52 53 54
  pthread_t  thread;
  uv_loop_t* loop;
  // uv_async_t*     cliAsync;  //
  SAsyncPool*     asyncPool;
dengyihao's avatar
dengyihao 已提交
55
  uv_timer_t*     timer;
dengyihao's avatar
dengyihao 已提交
56
  void*           pool;  // conn pool
dengyihao's avatar
dengyihao 已提交
57 58
  queue           msg;
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
59
  uint64_t        nextTimeout;  // next timeout
dengyihao's avatar
dengyihao 已提交
60
  void*           pTransInst;   //
dengyihao's avatar
dengyihao 已提交
61
  bool            quit;
dengyihao's avatar
dengyihao 已提交
62 63 64 65 66 67 68 69 70
} SCliThrdObj;

typedef struct SClientObj {
  char          label[TSDB_LABEL_LEN];
  int32_t       index;
  int           numOfThreads;
  SCliThrdObj** pThreadObj;
} SClientObj;

dengyihao's avatar
dengyihao 已提交
71 72 73 74
typedef struct SConnList {
  queue conn;
} SConnList;

dengyihao's avatar
dengyihao 已提交
75
// conn pool
dengyihao's avatar
dengyihao 已提交
76
// add expire timeout and capacity limit
77 78
static void*     creatConnPool(int size);
static void*     destroyConnPool(void* pool);
dengyihao's avatar
dengyihao 已提交
79 80
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void      addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
81

dengyihao's avatar
dengyihao 已提交
82 83
// register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
84 85 86
// check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf);
// alloc buf for read
dengyihao's avatar
dengyihao 已提交
87
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
88
// callback after read nbytes from socket
dengyihao's avatar
dengyihao 已提交
89
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
90
// callback after write data to socket
dengyihao's avatar
dengyihao 已提交
91
static void clientWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
92
// callback after conn  to server
dengyihao's avatar
dengyihao 已提交
93
static void clientConnCb(uv_connect_t* req, int status);
dengyihao's avatar
dengyihao 已提交
94
static void clientAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
95
static void clientDestroy(uv_handle_t* handle);
96
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
dengyihao's avatar
dengyihao 已提交
97

dengyihao's avatar
dengyihao 已提交
98 99 100 101
// process data read from server, auth/decompress etc later
static void clientHandleResp(SCliConn* conn);
// handle except about conn
static void clientHandleExcept(SCliConn* conn);
102 103
// handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
dengyihao's avatar
dengyihao 已提交
104 105
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientSendQuit(SCliThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
106

dengyihao's avatar
dengyihao 已提交
107 108 109 110
static void destroyUserdata(SRpcMsg* userdata);

static void destroyCmsg(SCliMsg* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
111 112 113 114
// thread obj
static SCliThrdObj* createThrdObj();
static void         destroyThrdObj(SCliThrdObj* pThrd);
// thread
dengyihao's avatar
dengyihao 已提交
115 116
static void* clientThread(void* arg);

117
static void clientHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
118 119
  SCliMsg*       pMsg = conn->data;
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
120
  SRpcInfo*      pRpc = pCtx->pTransInst;
dengyihao's avatar
dengyihao 已提交
121

dengyihao's avatar
dengyihao 已提交
122 123 124 125
  STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);

dengyihao's avatar
dengyihao 已提交
126
  SRpcMsg rpcMsg;
dengyihao's avatar
dengyihao 已提交
127
  rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
128
  rpcMsg.pCont = transContFromHead((char*)pHead);
dengyihao's avatar
dengyihao 已提交
129 130
  rpcMsg.code = pHead->code;
  rpcMsg.msgType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
131
  rpcMsg.ahandle = pCtx->ahandle;
dengyihao's avatar
dengyihao 已提交
132

dengyihao's avatar
dengyihao 已提交
133 134
  tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr),
         ntohs(conn->addr.sin_port));
dengyihao's avatar
dengyihao 已提交
135

dengyihao's avatar
dengyihao 已提交
136
  if (conn->push != NULL && conn->notifyCount != 0) {
dengyihao's avatar
dengyihao 已提交
137
    (*conn->push->callback)(conn->push->arg, &rpcMsg);
dengyihao's avatar
dengyihao 已提交
138
  } else {
dengyihao's avatar
dengyihao 已提交
139 140 141 142 143 144 145 146
    if (pCtx->pSem == NULL) {
      tTrace("client conn(sync) %p handle resp", conn);
      (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
    } else {
      tTrace("client conn(sync) %p handle resp", conn);
      memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
      tsem_post(pCtx->pSem);
    }
dengyihao's avatar
dengyihao 已提交
147
  }
dengyihao's avatar
dengyihao 已提交
148
  conn->notifyCount += 1;
dengyihao's avatar
dengyihao 已提交
149

dengyihao's avatar
dengyihao 已提交
150
  // buf's mem alread translated to rpcMsg.pCont
dengyihao's avatar
dengyihao 已提交
151 152
  transClearBuffer(&conn->readBuf);

dengyihao's avatar
dengyihao 已提交
153
  uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
dengyihao's avatar
dengyihao 已提交
154 155

  SCliThrdObj* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
156
  // user owns conn->persist = 1
dengyihao's avatar
dengyihao 已提交
157
  if (conn->push == NULL) {
dengyihao's avatar
dengyihao 已提交
158 159
    addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
  }
dengyihao's avatar
dengyihao 已提交
160

dengyihao's avatar
dengyihao 已提交
161 162
  destroyCmsg(pMsg);
  conn->data = NULL;
dengyihao's avatar
dengyihao 已提交
163
  // start thread's timer of conn pool if not active
dengyihao's avatar
dengyihao 已提交
164 165
  if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
    uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
166
  }
dengyihao's avatar
dengyihao 已提交
167 168
}
static void clientHandleExcept(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
169
  if (pConn->data == NULL && pConn->push == NULL) {
dengyihao's avatar
dengyihao 已提交
170
    // handle conn except in conn pool
dengyihao's avatar
dengyihao 已提交
171 172 173
    clientConnDestroy(pConn, true);
    return;
  }
dengyihao's avatar
dengyihao 已提交
174
  tTrace("client conn %p start to destroy", pConn);
dengyihao's avatar
dengyihao 已提交
175
  SCliMsg* pMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
176

dengyihao's avatar
dengyihao 已提交
177 178
  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
179
  SRpcMsg rpcMsg = {0};
dengyihao's avatar
dengyihao 已提交
180
  rpcMsg.ahandle = pCtx->ahandle;
dengyihao's avatar
dengyihao 已提交
181
  rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
dengyihao's avatar
dengyihao 已提交
182

dengyihao's avatar
dengyihao 已提交
183
  if (pConn->push != NULL && pConn->notifyCount != 0) {
dengyihao's avatar
dengyihao 已提交
184
    (*pConn->push->callback)(pConn->push->arg, &rpcMsg);
dengyihao's avatar
dengyihao 已提交
185
  } else {
dengyihao's avatar
dengyihao 已提交
186 187 188 189 190 191 192
    if (pCtx->pSem == NULL) {
      (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
    } else {
      memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
      // SRpcMsg rpcMsg
      tsem_post(pCtx->pSem);
    }
dengyihao's avatar
dengyihao 已提交
193 194 195
    if (pConn->push != NULL) {
      (*pConn->push->callback)(pConn->push->arg, &rpcMsg);
    }
dengyihao's avatar
dengyihao 已提交
196
  }
dengyihao's avatar
dengyihao 已提交
197 198 199 200

  destroyCmsg(pMsg);
  pConn->data = NULL;
  // transDestroyConnCtx(pCtx);
dengyihao's avatar
dengyihao 已提交
201
  clientConnDestroy(pConn, true);
dengyihao's avatar
dengyihao 已提交
202
  pConn->notifyCount += 1;
dengyihao's avatar
dengyihao 已提交
203
}
dengyihao's avatar
dengyihao 已提交
204

dengyihao's avatar
dengyihao 已提交
205 206
static void clientTimeoutCb(uv_timer_t* handle) {
  SCliThrdObj* pThrd = handle->data;
dengyihao's avatar
dengyihao 已提交
207
  SRpcInfo*    pRpc = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
208
  int64_t      currentTime = pThrd->nextTimeout;
dengyihao's avatar
dengyihao 已提交
209
  tTrace("client conn timeout, try to remove expire conn from conn pool");
dengyihao's avatar
dengyihao 已提交
210

dengyihao's avatar
dengyihao 已提交
211
  SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
dengyihao's avatar
dengyihao 已提交
212 213 214 215 216 217
  while (p != NULL) {
    while (!QUEUE_IS_EMPTY(&p->conn)) {
      queue*    h = QUEUE_HEAD(&p->conn);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
      if (c->expireTime < currentTime) {
        QUEUE_REMOVE(h);
218 219 220
        // uv_stream_t stm = *(c->stream);
        // uv_close((uv_handle_t*)&stm, clientDestroy);
        clientConnDestroy(c, true);
dengyihao's avatar
dengyihao 已提交
221 222 223 224
      } else {
        break;
      }
    }
dengyihao's avatar
dengyihao 已提交
225
    p = taosHashIterate((SHashObj*)pThrd->pool, p);
dengyihao's avatar
dengyihao 已提交
226 227
  }

dengyihao's avatar
dengyihao 已提交
228 229
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
  uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
230
}
231 232 233
static void* creatConnPool(int size) {
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
234
}
235
static void* destroyConnPool(void* pool) {
dengyihao's avatar
dengyihao 已提交
236
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
237 238 239 240 241
  while (connList != NULL) {
    while (!QUEUE_IS_EMPTY(&connList->conn)) {
      queue* h = QUEUE_HEAD(&connList->conn);
      QUEUE_REMOVE(h);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
242
      clientConnDestroy(c, true);
dengyihao's avatar
dengyihao 已提交
243
    }
dengyihao's avatar
dengyihao 已提交
244
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
245
  }
dengyihao's avatar
dengyihao 已提交
246
  taosHashClear(pool);
dengyihao's avatar
dengyihao 已提交
247 248
}

dengyihao's avatar
dengyihao 已提交
249
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
dengyihao's avatar
dengyihao 已提交
250 251 252 253
  char key[128] = {0};
  tstrncpy(key, ip, strlen(ip));
  tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));

dengyihao's avatar
dengyihao 已提交
254 255
  SHashObj*  pPool = pool;
  SConnList* plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
256 257
  if (plist == NULL) {
    SConnList list;
dengyihao's avatar
dengyihao 已提交
258 259
    taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
260
    QUEUE_INIT(&plist->conn);
dengyihao's avatar
dengyihao 已提交
261 262 263 264 265 266 267
  }

  if (QUEUE_IS_EMPTY(&plist->conn)) {
    return NULL;
  }
  queue* h = QUEUE_HEAD(&plist->conn);
  QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
268 269 270 271

  SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
  QUEUE_INIT(&conn->conn);
  return conn;
dengyihao's avatar
dengyihao 已提交
272
}
dengyihao's avatar
dengyihao 已提交
273
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
274
  char key[128] = {0};
dengyihao's avatar
dengyihao 已提交
275

dengyihao's avatar
dengyihao 已提交
276 277
  tstrncpy(key, ip, strlen(ip));
  tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
dengyihao's avatar
dengyihao 已提交
278
  tTrace("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
dengyihao's avatar
dengyihao 已提交
279

dengyihao's avatar
dengyihao 已提交
280
  SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
dengyihao's avatar
dengyihao 已提交
281

dengyihao's avatar
dengyihao 已提交
282
  conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
dengyihao's avatar
dengyihao 已提交
283
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
284
  conn->notifyCount = 0;
dengyihao's avatar
dengyihao 已提交
285 286 287 288
  // list already create before
  assert(plist != NULL);
  QUEUE_PUSH(&plist->conn, &conn->conn);
}
dengyihao's avatar
dengyihao 已提交
289 290 291 292 293 294 295 296 297
static bool clientReadComplete(SConnBuffer* data) {
  STransMsgHead head;
  int32_t       headLen = sizeof(head);
  if (data->len >= headLen) {
    memcpy((char*)&head, data->buf, headLen);
    int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
    if (msgLen > data->len) {
      data->left = msgLen - data->len;
      return false;
dengyihao's avatar
dengyihao 已提交
298 299
    } else if (msgLen == data->len) {
      data->left = 0;
dengyihao's avatar
dengyihao 已提交
300 301 302 303 304 305
      return true;
    }
  } else {
    return false;
  }
}
dengyihao's avatar
dengyihao 已提交
306
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
307 308
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
309
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
310 311
}
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
312
  // impl later
dengyihao's avatar
dengyihao 已提交
313 314
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
315
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
316 317
    pBuf->len += nread;
    if (clientReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
318
      uv_read_stop((uv_stream_t*)conn->stream);
dengyihao's avatar
dengyihao 已提交
319
      tTrace("client conn %p read complete", conn);
320
      clientHandleResp(conn);
dengyihao's avatar
dengyihao 已提交
321
    } else {
dengyihao's avatar
dengyihao 已提交
322
      tTrace("client conn %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
323
    }
dengyihao's avatar
dengyihao 已提交
324 325
    return;
  }
326 327
  assert(nread <= 0);
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
328 329 330
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
331 332
    return;
  }
dengyihao's avatar
dengyihao 已提交
333
  if (nread < 0 || nread == UV_EOF) {
dengyihao's avatar
dengyihao 已提交
334
    tError("client conn %p read error: %s", conn, uv_err_name(nread));
dengyihao's avatar
dengyihao 已提交
335
    clientHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
336
  }
337 338
  // tDebug("Read error %s\n", uv_err_name(nread));
  // uv_close((uv_handle_t*)handle, clientDestroy);
dengyihao's avatar
dengyihao 已提交
339
}
dengyihao's avatar
dengyihao 已提交
340

341
static void clientConnDestroy(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
342
  //
dengyihao's avatar
dengyihao 已提交
343 344
  conn->ref--;
  if (conn->ref == 0) {
dengyihao's avatar
dengyihao 已提交
345
    tTrace("client conn %p remove from conn pool", conn);
dengyihao's avatar
dengyihao 已提交
346 347 348 349
    QUEUE_REMOVE(&conn->conn);
    if (clear) {
      uv_close((uv_handle_t*)conn->stream, clientDestroy);
    }
350
  }
dengyihao's avatar
dengyihao 已提交
351
}
dengyihao's avatar
dengyihao 已提交
352 353
static void clientDestroy(uv_handle_t* handle) {
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
354
  // transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
355 356 357

  free(conn->stream);
  free(conn->writeReq);
dengyihao's avatar
dengyihao 已提交
358
  tTrace("client conn %p destroy successfully", conn);
dengyihao's avatar
dengyihao 已提交
359 360 361
  free(conn);

  // clientConnDestroy(conn, false);
dengyihao's avatar
dengyihao 已提交
362 363 364 365 366
}

static void clientWriteCb(uv_write_t* req, int status) {
  SCliConn* pConn = req->data;
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
367
    tTrace("client conn %p data already was written out", pConn);
dengyihao's avatar
dengyihao 已提交
368
    SCliMsg* pMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
369
    if (pMsg == NULL) {
dengyihao's avatar
dengyihao 已提交
370 371
      // handle
      return;
dengyihao's avatar
dengyihao 已提交
372
    }
dengyihao's avatar
dengyihao 已提交
373
    destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
374
  } else {
dengyihao's avatar
dengyihao 已提交
375
    tError("client conn %p failed to write: %s", pConn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
376
    clientHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
377 378
    return;
  }
dengyihao's avatar
dengyihao 已提交
379
  SCliThrdObj* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
380
  uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
dengyihao's avatar
dengyihao 已提交
381
}
dengyihao's avatar
dengyihao 已提交
382 383

static void clientWrite(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
384
  SCliMsg*       pCliMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
385 386 387 388
  STransConnCtx* pCtx = pCliMsg->ctx;

  SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);

dengyihao's avatar
dengyihao 已提交
389
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
390 391 392 393 394 395 396 397
  int            msgLen = transMsgLenFromCont(pMsg->contLen);

  if (!pConn->secured) {
    char* buf = calloc(1, msgLen + sizeof(STransUserMsg));
    memcpy(buf, (char*)pHead, msgLen);

    STransUserMsg* uMsg = (STransUserMsg*)(buf + msgLen);
    memcpy(uMsg->user, pCtx->pTransInst->user, tListLen(uMsg->user));
dengyihao's avatar
dengyihao 已提交
398

dengyihao's avatar
dengyihao 已提交
399 400 401 402 403 404 405 406 407 408 409 410
    // to avoid mem leak
    destroyUserdata(pMsg);

    pMsg->pCont = (char*)buf + sizeof(STransMsgHead);
    pMsg->contLen = msgLen + sizeof(STransUserMsg) - sizeof(STransMsgHead);

    pConn->secured = 1;  // del later

    pHead = (STransMsgHead*)buf;
    pHead->secured = 0;
    msgLen += sizeof(STransUserMsg);
  }
dengyihao's avatar
dengyihao 已提交
411

dengyihao's avatar
dengyihao 已提交
412 413 414 415
  pHead->msgType = pMsg->msgType;
  pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);

  uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
416 417
  tDebug("client conn %p %s is send to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr),
         ntohs(pConn->addr.sin_port));
dengyihao's avatar
dengyihao 已提交
418 419
  uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
}
dengyihao's avatar
dengyihao 已提交
420 421
static void clientConnCb(uv_connect_t* req, int status) {
  // impl later
dengyihao's avatar
dengyihao 已提交
422 423
  SCliConn* pConn = req->data;
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
424
    // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
425
    tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
426
    clientHandleExcept(pConn);
427
    return;
dengyihao's avatar
dengyihao 已提交
428
  }
dengyihao's avatar
dengyihao 已提交
429 430 431 432
  int addrlen = sizeof(pConn->addr);
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen);

  tTrace("client conn %p create", pConn);
dengyihao's avatar
dengyihao 已提交
433

dengyihao's avatar
dengyihao 已提交
434
  assert(pConn->stream == req->handle);
dengyihao's avatar
dengyihao 已提交
435
  clientWrite(pConn);
dengyihao's avatar
dengyihao 已提交
436 437
}

dengyihao's avatar
dengyihao 已提交
438
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
439
  tDebug("client work thread %p start to quit", pThrd);
dengyihao's avatar
dengyihao 已提交
440
  destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
441
  // transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL);
dengyihao's avatar
dengyihao 已提交
442 443 444 445 446
  uv_timer_stop(pThrd->timer);
  pThrd->quit = true;
  // uv__async_stop(pThrd->cliAsync);
  uv_stop(pThrd->loop);
}
dengyihao's avatar
dengyihao 已提交
447
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
448 449
  uint64_t et = taosGetTimestampUs();
  uint64_t el = et - pMsg->st;
dengyihao's avatar
dengyihao 已提交
450
  tTrace("client msg tran time cost: %" PRIu64 "", el);
dengyihao's avatar
dengyihao 已提交
451
  et = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
452

dengyihao's avatar
dengyihao 已提交
453 454 455 456
  // if (pMsg->msg.handle != NULL) {
  //  // handle
  //}

dengyihao's avatar
dengyihao 已提交
457
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
458
  SCliConn*      conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
dengyihao's avatar
dengyihao 已提交
459
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
460
    // impl later
dengyihao's avatar
dengyihao 已提交
461
    tTrace("client get conn %p from pool", conn);
dengyihao's avatar
dengyihao 已提交
462 463
    conn->data = pMsg;
    conn->writeReq->data = conn;
dengyihao's avatar
dengyihao 已提交
464
    transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
465 466 467 468 469

    if (pThrd->quit) {
      clientHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
470
    clientWrite(conn);
dengyihao's avatar
dengyihao 已提交
471

dengyihao's avatar
dengyihao 已提交
472 473
    conn->push = pMsg->msg.push;

dengyihao's avatar
dengyihao 已提交
474
  } else {
dengyihao's avatar
dengyihao 已提交
475
    SCliConn* conn = calloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
476
    conn->ref++;
dengyihao's avatar
dengyihao 已提交
477
    // read/write stream handle
dengyihao's avatar
dengyihao 已提交
478 479
    conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
    uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
dengyihao's avatar
dengyihao 已提交
480 481 482
    conn->stream->data = conn;

    // write req handle
dengyihao's avatar
dengyihao 已提交
483
    conn->writeReq = malloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
484
    conn->writeReq->data = conn;
dengyihao's avatar
dengyihao 已提交
485

dengyihao's avatar
dengyihao 已提交
486
    QUEUE_INIT(&conn->conn);
dengyihao's avatar
dengyihao 已提交
487 488 489

    conn->connReq.data = conn;
    conn->data = pMsg;
dengyihao's avatar
dengyihao 已提交
490
    conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
491

dengyihao's avatar
dengyihao 已提交
492 493
    conn->push = pMsg->msg.push;

dengyihao's avatar
dengyihao 已提交
494
    struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
495
    uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
dengyihao's avatar
dengyihao 已提交
496
    // handle error in callback if fail to connect
dengyihao's avatar
dengyihao 已提交
497
    uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
dengyihao's avatar
dengyihao 已提交
498 499 500
  }
}
static void clientAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
501 502
  SAsyncItem*  item = handle->data;
  SCliThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
503 504
  SCliMsg*     pMsg = NULL;
  queue        wq;
dengyihao's avatar
dengyihao 已提交
505

dengyihao's avatar
dengyihao 已提交
506
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
dengyihao 已提交
507 508 509
  pthread_mutex_lock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  pthread_mutex_unlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
510

dengyihao's avatar
dengyihao 已提交
511 512 513 514
  int count = 0;
  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
515 516

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
517 518 519 520 521 522
    if (pMsg->ctx == NULL) {
      clientHandleQuit(pMsg, pThrd);
    } else {
      clientHandleReq(pMsg, pThrd);
    }
    // clientHandleReq(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
523
    count++;
dengyihao's avatar
dengyihao 已提交
524 525
  }
  if (count >= 2) {
dengyihao's avatar
dengyihao 已提交
526
    tTrace("client process batch size: %d", count);
dengyihao's avatar
dengyihao 已提交
527
  }
dengyihao's avatar
dengyihao 已提交
528 529 530 531
}

static void* clientThread(void* arg) {
  SCliThrdObj* pThrd = (SCliThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
532
  setThreadName("trans-client-work");
dengyihao's avatar
dengyihao 已提交
533 534 535 536 537
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
}

void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
  SClientObj* cli = calloc(1, sizeof(SClientObj));
dengyihao's avatar
dengyihao 已提交
538

dengyihao's avatar
dengyihao 已提交
539
  SRpcInfo* pRpc = shandle;
dengyihao's avatar
dengyihao 已提交
540 541 542 543 544
  memcpy(cli->label, label, strlen(label));
  cli->numOfThreads = numOfThreads;
  cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
545
    SCliThrdObj* pThrd = createThrdObj();
dengyihao's avatar
dengyihao 已提交
546
    pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
dengyihao's avatar
dengyihao 已提交
547
    pThrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
548

dengyihao's avatar
dengyihao 已提交
549
    int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
550
    if (err == 0) {
dengyihao's avatar
dengyihao 已提交
551
      tDebug("success to create tranport-client thread %d", i);
dengyihao's avatar
dengyihao 已提交
552
    }
dengyihao's avatar
dengyihao 已提交
553
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
554 555 556
  }
  return cli;
}
dengyihao's avatar
dengyihao 已提交
557 558 559 560 561 562 563 564 565 566 567 568 569 570

static void destroyUserdata(SRpcMsg* userdata) {
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
static void destroyCmsg(SCliMsg* pMsg) {
  if (pMsg == NULL) {
    return;
  }
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
571 572
  free(pMsg);
}
dengyihao's avatar
dengyihao 已提交
573

dengyihao's avatar
dengyihao 已提交
574 575 576 577 578 579 580 581
static SCliThrdObj* createThrdObj() {
  SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);

  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  uv_loop_init(pThrd->loop);

dengyihao's avatar
dengyihao 已提交
582
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, clientAsyncCb);
dengyihao's avatar
dengyihao 已提交
583

dengyihao's avatar
dengyihao 已提交
584 585 586
  pThrd->timer = malloc(sizeof(uv_timer_t));
  uv_timer_init(pThrd->loop, pThrd->timer);
  pThrd->timer->data = pThrd;
dengyihao's avatar
dengyihao 已提交
587

dengyihao's avatar
dengyihao 已提交
588
  pThrd->pool = creatConnPool(4);
dengyihao's avatar
dengyihao 已提交
589 590

  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
591 592
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
593
static void destroyThrdObj(SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
594 595 596
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
597
  uv_stop(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
598 599
  pthread_join(pThrd->thread, NULL);
  pthread_mutex_destroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
600 601
  transDestroyAsyncPool(pThrd->asyncPool);
  // free(pThrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
602
  free(pThrd->timer);
dengyihao's avatar
dengyihao 已提交
603 604 605
  free(pThrd->loop);
  free(pThrd);
}
dengyihao's avatar
dengyihao 已提交
606

dengyihao's avatar
dengyihao 已提交
607
static void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
608 609 610 611 612
  if (ctx != NULL) {
    free(ctx->ip);
  }
  free(ctx);
}
dengyihao's avatar
dengyihao 已提交
613
//
dengyihao's avatar
dengyihao 已提交
614 615 616 617 618
static void clientSendQuit(SCliThrdObj* thrd) {
  // cli can stop gracefully
  SCliMsg* msg = calloc(1, sizeof(SCliMsg));
  msg->ctx = NULL;  //

dengyihao's avatar
dengyihao 已提交
619 620 621
  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &msg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
622

dengyihao's avatar
dengyihao 已提交
623
  transSendAsync(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
624
  // uv_async_send(thrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
625
}
dengyihao's avatar
dengyihao 已提交
626 627 628
void taosCloseClient(void* arg) {
  // impl later
  SClientObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
629
  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
630
    clientSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
631
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
632 633 634
  }
  free(cli->pThreadObj);
  free(cli);
dengyihao's avatar
dengyihao 已提交
635
}
dengyihao's avatar
dengyihao 已提交
636 637
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
  // impl later
dengyihao's avatar
dengyihao 已提交
638 639
  char*    ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
  uint32_t port = pEpSet->eps[pEpSet->inUse].port;
dengyihao's avatar
dengyihao 已提交
640

dengyihao's avatar
dengyihao 已提交
641 642
  SRpcInfo* pRpc = (SRpcInfo*)shandle;

dengyihao's avatar
dengyihao 已提交
643 644 645 646
  int32_t flen = 0;
  if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
    // imp later
  }
dengyihao's avatar
dengyihao 已提交
647

dengyihao's avatar
dengyihao 已提交
648 649
  STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));

dengyihao's avatar
dengyihao 已提交
650
  pCtx->pTransInst = (SRpcInfo*)shandle;
dengyihao's avatar
dengyihao 已提交
651 652 653 654
  pCtx->ahandle = pMsg->ahandle;
  pCtx->msgType = pMsg->msgType;
  pCtx->ip = strdup(ip);
  pCtx->port = port;
dengyihao's avatar
dengyihao 已提交
655 656 657 658 659 660 661

  assert(pRpc->connType == TAOS_CONN_CLIENT);
  // atomic or not
  int64_t index = pRpc->index;
  if (pRpc->index++ >= pRpc->numOfThreads) {
    pRpc->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
662 663 664 665
  SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pMsg;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
666 667 668

  SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];

dengyihao's avatar
dengyihao 已提交
669 670 671
  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
672

dengyihao's avatar
dengyihao 已提交
673 674
  // int start = taosGetTimestampUs();
  transSendAsync(thrd->asyncPool, &(cliMsg->q));
dengyihao's avatar
dengyihao 已提交
675
  // uv_async_send(thrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
676
  // int end = taosGetTimestampUs() - start;
dengyihao's avatar
dengyihao 已提交
677
  // tError("client sent to rpc, time cost: %d", (int)end);
dengyihao's avatar
dengyihao 已提交
678
}
dengyihao's avatar
dengyihao 已提交
679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
  char*    ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
  uint32_t port = pEpSet->eps[pEpSet->inUse].port;

  SRpcInfo* pRpc = (SRpcInfo*)shandle;

  STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
  pCtx->pTransInst = (SRpcInfo*)shandle;
  pCtx->ahandle = pReq->ahandle;
  pCtx->msgType = pReq->msgType;
  pCtx->ip = strdup(ip);
  pCtx->port = port;
  pCtx->pSem = calloc(1, sizeof(tsem_t));
  pCtx->pRsp = pRsp;
  tsem_init(pCtx->pSem, 0, 0);

  int64_t index = pRpc->index;
  if (pRpc->index++ >= pRpc->numOfThreads) {
    pRpc->index = 0;
  }
  SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();

  SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];

  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);

  // int start = taosGetTimestampUs();
  transSendAsync(thrd->asyncPool, &(cliMsg->q));

  tsem_t* pSem = pCtx->pSem;
  tsem_wait(pSem);
  tsem_destroy(pSem);
  free(pSem);

  return;
}
dengyihao's avatar
dengyihao 已提交
720
#endif