transCli.c 20.0 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

#include "transComm.h"

20
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
dengyihao's avatar
dengyihao 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23 24
typedef struct SCliConn {
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
25
  uv_write_t*  writeReq;
dengyihao's avatar
dengyihao 已提交
26
  void*        hostThrd;
dengyihao's avatar
dengyihao 已提交
27
  SConnBuffer  readBuf;
dengyihao's avatar
dengyihao 已提交
28 29
  void*        data;
  queue        conn;
dengyihao's avatar
dengyihao 已提交
30 31
  char         spi;
  char         secured;
dengyihao's avatar
dengyihao 已提交
32
  uint64_t     expireTime;
dengyihao's avatar
dengyihao 已提交
33
  int8_t       notifyCount;  // timers already notify to client
dengyihao's avatar
dengyihao 已提交
34
  int32_t      ref;
dengyihao's avatar
dengyihao 已提交
35
} SCliConn;
dengyihao's avatar
dengyihao 已提交
36

dengyihao's avatar
dengyihao 已提交
37
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
38 39 40 41
  STransConnCtx* ctx;
  SRpcMsg        msg;
  queue          q;
  uint64_t       st;
dengyihao's avatar
dengyihao 已提交
42 43 44
} SCliMsg;

typedef struct SCliThrdObj {
dengyihao's avatar
dengyihao 已提交
45 46 47 48
  pthread_t  thread;
  uv_loop_t* loop;
  // uv_async_t*     cliAsync;  //
  SAsyncPool*     asyncPool;
dengyihao's avatar
dengyihao 已提交
49
  uv_timer_t*     timer;
dengyihao's avatar
dengyihao 已提交
50
  void*           pool;  // conn pool
dengyihao's avatar
dengyihao 已提交
51 52
  queue           msg;
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
53
  uint64_t        nextTimeout;  // next timeout
dengyihao's avatar
dengyihao 已提交
54
  void*           pTransInst;   //
dengyihao's avatar
dengyihao 已提交
55
  bool            quit;
dengyihao's avatar
dengyihao 已提交
56 57 58 59 60 61 62 63 64
} SCliThrdObj;

typedef struct SClientObj {
  char          label[TSDB_LABEL_LEN];
  int32_t       index;
  int           numOfThreads;
  SCliThrdObj** pThreadObj;
} SClientObj;

dengyihao's avatar
dengyihao 已提交
65 66 67 68
typedef struct SConnList {
  queue conn;
} SConnList;

dengyihao's avatar
dengyihao 已提交
69
// conn pool
dengyihao's avatar
dengyihao 已提交
70
// add expire timeout and capacity limit
71 72
static void*     creatConnPool(int size);
static void*     destroyConnPool(void* pool);
dengyihao's avatar
dengyihao 已提交
73 74
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void      addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
75

dengyihao's avatar
dengyihao 已提交
76 77
// register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
78 79 80
// check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf);
// alloc buf for read
dengyihao's avatar
dengyihao 已提交
81
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
82
// callback after read nbytes from socket
dengyihao's avatar
dengyihao 已提交
83
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
84
// callback after write data to socket
dengyihao's avatar
dengyihao 已提交
85
static void clientWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
86
// callback after conn  to server
dengyihao's avatar
dengyihao 已提交
87
static void clientConnCb(uv_connect_t* req, int status);
dengyihao's avatar
dengyihao 已提交
88
static void clientAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
89
static void clientDestroy(uv_handle_t* handle);
90
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
dengyihao's avatar
dengyihao 已提交
91

dengyihao's avatar
dengyihao 已提交
92 93 94 95
// process data read from server, auth/decompress etc later
static void clientHandleResp(SCliConn* conn);
// handle except about conn
static void clientHandleExcept(SCliConn* conn);
96 97
// handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
dengyihao's avatar
dengyihao 已提交
98 99
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientSendQuit(SCliThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
100

dengyihao's avatar
dengyihao 已提交
101 102 103 104
static void destroyUserdata(SRpcMsg* userdata);

static void destroyCmsg(SCliMsg* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
105 106 107 108
// thread obj
static SCliThrdObj* createThrdObj();
static void         destroyThrdObj(SCliThrdObj* pThrd);
// thread
dengyihao's avatar
dengyihao 已提交
109 110
static void* clientThread(void* arg);

111
static void clientHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
112 113
  SCliMsg*       pMsg = conn->data;
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
114
  SRpcInfo*      pRpc = pCtx->pTransInst;
dengyihao's avatar
dengyihao 已提交
115

dengyihao's avatar
dengyihao 已提交
116 117 118 119
  STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);

dengyihao's avatar
dengyihao 已提交
120
  SRpcMsg rpcMsg;
dengyihao's avatar
dengyihao 已提交
121
  rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
122
  rpcMsg.pCont = transContFromHead((char*)pHead);
dengyihao's avatar
dengyihao 已提交
123 124
  rpcMsg.code = pHead->code;
  rpcMsg.msgType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
125
  rpcMsg.ahandle = pCtx->ahandle;
dengyihao's avatar
dengyihao 已提交
126 127

  tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), pMsg->ctx->ip, pMsg->ctx->port);
dengyihao's avatar
dengyihao 已提交
128
  if (pCtx->pSem == NULL) {
dengyihao's avatar
dengyihao 已提交
129
    tTrace("client conn(sync) %p handle resp", conn);
dengyihao's avatar
dengyihao 已提交
130
    (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
dengyihao's avatar
dengyihao 已提交
131
  } else {
dengyihao's avatar
dengyihao 已提交
132
    tTrace("client conn(sync) %p handle resp", conn);
dengyihao's avatar
dengyihao 已提交
133 134 135
    memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
    tsem_post(pCtx->pSem);
  }
dengyihao's avatar
dengyihao 已提交
136
  conn->notifyCount += 1;
dengyihao's avatar
dengyihao 已提交
137

dengyihao's avatar
dengyihao 已提交
138
  // buf's mem alread translated to rpcMsg.pCont
dengyihao's avatar
dengyihao 已提交
139 140
  transClearBuffer(&conn->readBuf);

dengyihao's avatar
dengyihao 已提交
141
  uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
dengyihao's avatar
dengyihao 已提交
142 143

  SCliThrdObj* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
144
  addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
dengyihao's avatar
dengyihao 已提交
145

dengyihao's avatar
dengyihao 已提交
146 147
  destroyCmsg(pMsg);
  conn->data = NULL;
dengyihao's avatar
dengyihao 已提交
148
  // start thread's timer of conn pool if not active
dengyihao's avatar
dengyihao 已提交
149 150
  if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
    uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
151
  }
dengyihao's avatar
dengyihao 已提交
152 153
}
static void clientHandleExcept(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
154
  if (pConn->data == NULL) {
dengyihao's avatar
dengyihao 已提交
155
    // handle conn except in conn pool
dengyihao's avatar
dengyihao 已提交
156 157 158
    clientConnDestroy(pConn, true);
    return;
  }
dengyihao's avatar
dengyihao 已提交
159
  tDebug("client conn %p start to destroy", pConn);
dengyihao's avatar
dengyihao 已提交
160
  SCliMsg* pMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
161 162

  destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
163 164 165

  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
166
  SRpcMsg rpcMsg = {0};
dengyihao's avatar
dengyihao 已提交
167
  rpcMsg.ahandle = pCtx->ahandle;
dengyihao's avatar
dengyihao 已提交
168
  rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
dengyihao's avatar
dengyihao 已提交
169 170
  if (pCtx->pSem == NULL) {
    // SRpcInfo* pRpc = pMsg->ctx->pRpc;
dengyihao's avatar
dengyihao 已提交
171
    (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
dengyihao's avatar
dengyihao 已提交
172 173 174 175 176
  } else {
    memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
    // SRpcMsg rpcMsg
    tsem_post(pCtx->pSem);
  }
dengyihao's avatar
dengyihao 已提交
177 178 179 180

  destroyCmsg(pMsg);
  pConn->data = NULL;
  // transDestroyConnCtx(pCtx);
dengyihao's avatar
dengyihao 已提交
181
  clientConnDestroy(pConn, true);
dengyihao's avatar
dengyihao 已提交
182
  pConn->notifyCount += 1;
dengyihao's avatar
dengyihao 已提交
183
}
dengyihao's avatar
dengyihao 已提交
184

dengyihao's avatar
dengyihao 已提交
185 186
static void clientTimeoutCb(uv_timer_t* handle) {
  SCliThrdObj* pThrd = handle->data;
dengyihao's avatar
dengyihao 已提交
187
  SRpcInfo*    pRpc = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
188
  int64_t      currentTime = pThrd->nextTimeout;
dengyihao's avatar
dengyihao 已提交
189
  tDebug("client conn timeout, try to remove expire conn from conn pool");
dengyihao's avatar
dengyihao 已提交
190

dengyihao's avatar
dengyihao 已提交
191
  SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
dengyihao's avatar
dengyihao 已提交
192 193 194 195 196 197
  while (p != NULL) {
    while (!QUEUE_IS_EMPTY(&p->conn)) {
      queue*    h = QUEUE_HEAD(&p->conn);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
      if (c->expireTime < currentTime) {
        QUEUE_REMOVE(h);
198 199 200
        // uv_stream_t stm = *(c->stream);
        // uv_close((uv_handle_t*)&stm, clientDestroy);
        clientConnDestroy(c, true);
dengyihao's avatar
dengyihao 已提交
201 202 203 204
      } else {
        break;
      }
    }
dengyihao's avatar
dengyihao 已提交
205
    p = taosHashIterate((SHashObj*)pThrd->pool, p);
dengyihao's avatar
dengyihao 已提交
206 207
  }

dengyihao's avatar
dengyihao 已提交
208 209
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
  uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
210
}
211 212 213
static void* creatConnPool(int size) {
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
214
}
215
static void* destroyConnPool(void* pool) {
dengyihao's avatar
dengyihao 已提交
216
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
217 218 219 220 221
  while (connList != NULL) {
    while (!QUEUE_IS_EMPTY(&connList->conn)) {
      queue* h = QUEUE_HEAD(&connList->conn);
      QUEUE_REMOVE(h);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
222
      clientConnDestroy(c, true);
dengyihao's avatar
dengyihao 已提交
223
    }
dengyihao's avatar
dengyihao 已提交
224
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
225
  }
dengyihao's avatar
dengyihao 已提交
226
  taosHashClear(pool);
dengyihao's avatar
dengyihao 已提交
227 228
}

dengyihao's avatar
dengyihao 已提交
229
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
dengyihao's avatar
dengyihao 已提交
230 231 232 233
  char key[128] = {0};
  tstrncpy(key, ip, strlen(ip));
  tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));

dengyihao's avatar
dengyihao 已提交
234 235
  SHashObj*  pPool = pool;
  SConnList* plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
236 237
  if (plist == NULL) {
    SConnList list;
dengyihao's avatar
dengyihao 已提交
238 239
    taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
240
    QUEUE_INIT(&plist->conn);
dengyihao's avatar
dengyihao 已提交
241 242 243 244 245 246 247
  }

  if (QUEUE_IS_EMPTY(&plist->conn)) {
    return NULL;
  }
  queue* h = QUEUE_HEAD(&plist->conn);
  QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
248 249 250 251

  SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
  QUEUE_INIT(&conn->conn);
  return conn;
dengyihao's avatar
dengyihao 已提交
252
}
dengyihao's avatar
dengyihao 已提交
253
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
254
  char key[128] = {0};
dengyihao's avatar
dengyihao 已提交
255

dengyihao's avatar
dengyihao 已提交
256 257
  tstrncpy(key, ip, strlen(ip));
  tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
dengyihao's avatar
dengyihao 已提交
258
  tDebug("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
dengyihao's avatar
dengyihao 已提交
259

dengyihao's avatar
dengyihao 已提交
260
  SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
dengyihao's avatar
dengyihao 已提交
261

dengyihao's avatar
dengyihao 已提交
262
  conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
dengyihao's avatar
dengyihao 已提交
263
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
264
  conn->notifyCount = 0;
dengyihao's avatar
dengyihao 已提交
265 266 267 268
  // list already create before
  assert(plist != NULL);
  QUEUE_PUSH(&plist->conn, &conn->conn);
}
dengyihao's avatar
dengyihao 已提交
269 270 271 272 273 274 275 276 277
static bool clientReadComplete(SConnBuffer* data) {
  STransMsgHead head;
  int32_t       headLen = sizeof(head);
  if (data->len >= headLen) {
    memcpy((char*)&head, data->buf, headLen);
    int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
    if (msgLen > data->len) {
      data->left = msgLen - data->len;
      return false;
dengyihao's avatar
dengyihao 已提交
278 279
    } else if (msgLen == data->len) {
      data->left = 0;
dengyihao's avatar
dengyihao 已提交
280 281 282 283 284 285
      return true;
    }
  } else {
    return false;
  }
}
dengyihao's avatar
dengyihao 已提交
286
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
287 288
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
289
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
290 291
}
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
292
  // impl later
dengyihao's avatar
dengyihao 已提交
293 294
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
295
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
296 297
    pBuf->len += nread;
    if (clientReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
298
      uv_read_stop((uv_stream_t*)conn->stream);
dengyihao's avatar
dengyihao 已提交
299
      tDebug("client conn %p read complete", conn);
300
      clientHandleResp(conn);
dengyihao's avatar
dengyihao 已提交
301
    } else {
dengyihao's avatar
dengyihao 已提交
302
      tDebug("client conn %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
303
    }
dengyihao's avatar
dengyihao 已提交
304 305
    return;
  }
306 307
  assert(nread <= 0);
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
308 309 310
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
311 312
    return;
  }
dengyihao's avatar
dengyihao 已提交
313
  if (nread < 0 || nread == UV_EOF) {
dengyihao's avatar
dengyihao 已提交
314
    tError("client conn %p read error: %s", conn, uv_err_name(nread));
dengyihao's avatar
dengyihao 已提交
315
    clientHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
316
  }
317 318
  // tDebug("Read error %s\n", uv_err_name(nread));
  // uv_close((uv_handle_t*)handle, clientDestroy);
dengyihao's avatar
dengyihao 已提交
319
}
dengyihao's avatar
dengyihao 已提交
320

321
static void clientConnDestroy(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
322
  //
dengyihao's avatar
dengyihao 已提交
323 324
  conn->ref--;
  if (conn->ref == 0) {
dengyihao's avatar
dengyihao 已提交
325
    tDebug("client conn %p remove from conn pool", conn);
dengyihao's avatar
dengyihao 已提交
326
    QUEUE_REMOVE(&conn->conn);
dengyihao's avatar
dengyihao 已提交
327
    tDebug("client conn %p remove from conn pool successfully", conn);
dengyihao's avatar
dengyihao 已提交
328 329 330
    if (clear) {
      uv_close((uv_handle_t*)conn->stream, clientDestroy);
    }
331
  }
dengyihao's avatar
dengyihao 已提交
332
}
dengyihao's avatar
dengyihao 已提交
333 334
static void clientDestroy(uv_handle_t* handle) {
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
335
  // transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
336 337 338

  free(conn->stream);
  free(conn->writeReq);
dengyihao's avatar
dengyihao 已提交
339
  tDebug("client conn %p destroy successfully", conn);
dengyihao's avatar
dengyihao 已提交
340 341 342
  free(conn);

  // clientConnDestroy(conn, false);
dengyihao's avatar
dengyihao 已提交
343 344 345 346 347
}

static void clientWriteCb(uv_write_t* req, int status) {
  SCliConn* pConn = req->data;
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
348
    tDebug("client conn %p data already was written out", pConn);
dengyihao's avatar
dengyihao 已提交
349
    SCliMsg* pMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
350
    if (pMsg == NULL) {
dengyihao's avatar
dengyihao 已提交
351 352
      // handle
      return;
dengyihao's avatar
dengyihao 已提交
353
    }
dengyihao's avatar
dengyihao 已提交
354
    destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
355
  } else {
dengyihao's avatar
dengyihao 已提交
356
    tError("client conn %p failed to write: %s", pConn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
357
    clientHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
358 359
    return;
  }
dengyihao's avatar
dengyihao 已提交
360
  SCliThrdObj* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
361
  uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
dengyihao's avatar
dengyihao 已提交
362
}
dengyihao's avatar
dengyihao 已提交
363 364

static void clientWrite(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
365 366 367 368
  SCliMsg*       pCliMsg = pConn->data;
  SRpcMsg*       pMsg = (SRpcMsg*)(&pCliMsg->msg);
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);

dengyihao's avatar
dengyihao 已提交
369
  int msgLen = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
370

dengyihao's avatar
dengyihao 已提交
371 372 373 374
  pHead->msgType = pMsg->msgType;
  pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);

  uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
375
  tDebug("conn %p %s is send to %s:%d", pConn, TMSG_INFO(pHead->msgType), pCliMsg->ctx->ip, pCliMsg->ctx->port);
dengyihao's avatar
dengyihao 已提交
376 377
  uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
}
dengyihao's avatar
dengyihao 已提交
378 379
static void clientConnCb(uv_connect_t* req, int status) {
  // impl later
dengyihao's avatar
dengyihao 已提交
380 381
  SCliConn* pConn = req->data;
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
382
    // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
383
    tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
384
    clientHandleExcept(pConn);
385
    return;
dengyihao's avatar
dengyihao 已提交
386
  }
dengyihao's avatar
dengyihao 已提交
387
  tDebug("client conn %p create", pConn);
dengyihao's avatar
dengyihao 已提交
388

dengyihao's avatar
dengyihao 已提交
389
  assert(pConn->stream == req->handle);
dengyihao's avatar
dengyihao 已提交
390
  clientWrite(pConn);
dengyihao's avatar
dengyihao 已提交
391 392
}

dengyihao's avatar
dengyihao 已提交
393 394 395
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
  tDebug("thread %p start to quit", pThrd);
  destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
396
  // transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL);
dengyihao's avatar
dengyihao 已提交
397 398 399 400 401
  uv_timer_stop(pThrd->timer);
  pThrd->quit = true;
  // uv__async_stop(pThrd->cliAsync);
  uv_stop(pThrd->loop);
}
dengyihao's avatar
dengyihao 已提交
402
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
403 404
  uint64_t et = taosGetTimestampUs();
  uint64_t el = et - pMsg->st;
dengyihao's avatar
dengyihao 已提交
405
  tDebug("client msg tran time cost: %" PRIu64 "", el);
dengyihao's avatar
dengyihao 已提交
406
  et = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
407

dengyihao's avatar
dengyihao 已提交
408
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
409
  SCliConn*      conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
dengyihao's avatar
dengyihao 已提交
410
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
411
    // impl later
dengyihao's avatar
dengyihao 已提交
412
    tDebug("client get conn %p from pool", conn);
dengyihao's avatar
dengyihao 已提交
413 414
    conn->data = pMsg;
    conn->writeReq->data = conn;
dengyihao's avatar
dengyihao 已提交
415
    transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
416 417 418 419 420

    if (pThrd->quit) {
      clientHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
421
    clientWrite(conn);
dengyihao's avatar
dengyihao 已提交
422

dengyihao's avatar
dengyihao 已提交
423
  } else {
dengyihao's avatar
dengyihao 已提交
424
    SCliConn* conn = calloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
425
    conn->ref++;
dengyihao's avatar
dengyihao 已提交
426
    // read/write stream handle
dengyihao's avatar
dengyihao 已提交
427 428
    conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
    uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
dengyihao's avatar
dengyihao 已提交
429 430 431
    conn->stream->data = conn;

    // write req handle
dengyihao's avatar
dengyihao 已提交
432
    conn->writeReq = malloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
433
    conn->writeReq->data = conn;
dengyihao's avatar
dengyihao 已提交
434

dengyihao's avatar
dengyihao 已提交
435
    QUEUE_INIT(&conn->conn);
dengyihao's avatar
dengyihao 已提交
436 437 438

    conn->connReq.data = conn;
    conn->data = pMsg;
dengyihao's avatar
dengyihao 已提交
439
    conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
440

dengyihao's avatar
dengyihao 已提交
441
    struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
442
    uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
dengyihao's avatar
dengyihao 已提交
443
    // handle error in callback if fail to connect
dengyihao's avatar
dengyihao 已提交
444
    uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
dengyihao's avatar
dengyihao 已提交
445 446 447
  }
}
static void clientAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
448 449
  SAsyncItem*  item = handle->data;
  SCliThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
450 451
  SCliMsg*     pMsg = NULL;
  queue        wq;
dengyihao's avatar
dengyihao 已提交
452

dengyihao's avatar
dengyihao 已提交
453
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
dengyihao 已提交
454 455 456
  pthread_mutex_lock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  pthread_mutex_unlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
457

dengyihao's avatar
dengyihao 已提交
458 459 460 461
  int count = 0;
  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
462 463

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
464 465 466 467 468 469
    if (pMsg->ctx == NULL) {
      clientHandleQuit(pMsg, pThrd);
    } else {
      clientHandleReq(pMsg, pThrd);
    }
    // clientHandleReq(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
470
    count++;
dengyihao's avatar
dengyihao 已提交
471 472 473
  }
  if (count >= 2) {
    tDebug("already process batch size: %d", count);
dengyihao's avatar
dengyihao 已提交
474
  }
dengyihao's avatar
dengyihao 已提交
475 476 477 478
}

static void* clientThread(void* arg) {
  SCliThrdObj* pThrd = (SCliThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
479
  setThreadName("trans-client-work");
dengyihao's avatar
dengyihao 已提交
480 481 482 483 484
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
}

void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
  SClientObj* cli = calloc(1, sizeof(SClientObj));
dengyihao's avatar
dengyihao 已提交
485

dengyihao's avatar
dengyihao 已提交
486
  SRpcInfo* pRpc = shandle;
dengyihao's avatar
dengyihao 已提交
487 488 489 490 491
  memcpy(cli->label, label, strlen(label));
  cli->numOfThreads = numOfThreads;
  cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
492
    SCliThrdObj* pThrd = createThrdObj();
dengyihao's avatar
dengyihao 已提交
493
    pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
dengyihao's avatar
dengyihao 已提交
494
    pThrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
495

dengyihao's avatar
dengyihao 已提交
496
    int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
497
    if (err == 0) {
dengyihao's avatar
dengyihao 已提交
498
      tDebug("success to create tranport-client thread %d", i);
dengyihao's avatar
dengyihao 已提交
499
    }
dengyihao's avatar
dengyihao 已提交
500
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
501 502 503
  }
  return cli;
}
dengyihao's avatar
dengyihao 已提交
504 505 506 507 508 509 510 511 512 513 514 515 516 517

static void destroyUserdata(SRpcMsg* userdata) {
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
static void destroyCmsg(SCliMsg* pMsg) {
  if (pMsg == NULL) {
    return;
  }
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
518 519
  free(pMsg);
}
dengyihao's avatar
dengyihao 已提交
520

dengyihao's avatar
dengyihao 已提交
521 522 523 524 525 526 527 528
static SCliThrdObj* createThrdObj() {
  SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);

  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  uv_loop_init(pThrd->loop);

dengyihao's avatar
dengyihao 已提交
529
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, clientAsyncCb);
dengyihao's avatar
dengyihao 已提交
530

dengyihao's avatar
dengyihao 已提交
531 532 533
  pThrd->timer = malloc(sizeof(uv_timer_t));
  uv_timer_init(pThrd->loop, pThrd->timer);
  pThrd->timer->data = pThrd;
dengyihao's avatar
dengyihao 已提交
534

535
  pThrd->pool = creatConnPool(1);
dengyihao's avatar
dengyihao 已提交
536 537

  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
538 539
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
540
static void destroyThrdObj(SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
541 542 543
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
544
  uv_stop(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
545 546
  pthread_join(pThrd->thread, NULL);
  pthread_mutex_destroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
547 548
  transDestroyAsyncPool(pThrd->asyncPool);
  // free(pThrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
549
  free(pThrd->timer);
dengyihao's avatar
dengyihao 已提交
550 551 552
  free(pThrd->loop);
  free(pThrd);
}
dengyihao's avatar
dengyihao 已提交
553

dengyihao's avatar
dengyihao 已提交
554
static void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
555 556 557 558 559
  if (ctx != NULL) {
    free(ctx->ip);
  }
  free(ctx);
}
dengyihao's avatar
dengyihao 已提交
560
//
dengyihao's avatar
dengyihao 已提交
561 562 563 564 565
static void clientSendQuit(SCliThrdObj* thrd) {
  // cli can stop gracefully
  SCliMsg* msg = calloc(1, sizeof(SCliMsg));
  msg->ctx = NULL;  //

dengyihao's avatar
dengyihao 已提交
566 567 568
  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &msg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
569

dengyihao's avatar
dengyihao 已提交
570
  transSendAsync(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
571
  // uv_async_send(thrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
572
}
dengyihao's avatar
dengyihao 已提交
573 574 575
void taosCloseClient(void* arg) {
  // impl later
  SClientObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
576
  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
577
    clientSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
578
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
579 580 581
  }
  free(cli->pThreadObj);
  free(cli);
dengyihao's avatar
dengyihao 已提交
582
}
dengyihao's avatar
dengyihao 已提交
583 584
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
  // impl later
dengyihao's avatar
dengyihao 已提交
585 586
  char*    ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
  uint32_t port = pEpSet->eps[pEpSet->inUse].port;
dengyihao's avatar
dengyihao 已提交
587

dengyihao's avatar
dengyihao 已提交
588 589
  SRpcInfo* pRpc = (SRpcInfo*)shandle;

dengyihao's avatar
dengyihao 已提交
590 591 592 593
  int32_t flen = 0;
  if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
    // imp later
  }
dengyihao's avatar
dengyihao 已提交
594

dengyihao's avatar
dengyihao 已提交
595 596
  STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));

dengyihao's avatar
dengyihao 已提交
597
  pCtx->pTransInst = (SRpcInfo*)shandle;
dengyihao's avatar
dengyihao 已提交
598 599 600 601
  pCtx->ahandle = pMsg->ahandle;
  pCtx->msgType = pMsg->msgType;
  pCtx->ip = strdup(ip);
  pCtx->port = port;
dengyihao's avatar
dengyihao 已提交
602 603 604 605 606 607 608

  assert(pRpc->connType == TAOS_CONN_CLIENT);
  // atomic or not
  int64_t index = pRpc->index;
  if (pRpc->index++ >= pRpc->numOfThreads) {
    pRpc->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
609 610 611 612
  SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pMsg;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
613 614 615

  SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];

dengyihao's avatar
dengyihao 已提交
616 617 618
  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
619

dengyihao's avatar
dengyihao 已提交
620 621
  // int start = taosGetTimestampUs();
  transSendAsync(thrd->asyncPool, &(cliMsg->q));
dengyihao's avatar
dengyihao 已提交
622
  // uv_async_send(thrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
623
  // int end = taosGetTimestampUs() - start;
dengyihao's avatar
dengyihao 已提交
624
  // tError("client sent to rpc, time cost: %d", (int)end);
dengyihao's avatar
dengyihao 已提交
625
}
dengyihao's avatar
dengyihao 已提交
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
  char*    ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
  uint32_t port = pEpSet->eps[pEpSet->inUse].port;

  SRpcInfo* pRpc = (SRpcInfo*)shandle;

  STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
  pCtx->pTransInst = (SRpcInfo*)shandle;
  pCtx->ahandle = pReq->ahandle;
  pCtx->msgType = pReq->msgType;
  pCtx->ip = strdup(ip);
  pCtx->port = port;
  pCtx->pSem = calloc(1, sizeof(tsem_t));
  pCtx->pRsp = pRsp;
  tsem_init(pCtx->pSem, 0, 0);

  int64_t index = pRpc->index;
  if (pRpc->index++ >= pRpc->numOfThreads) {
    pRpc->index = 0;
  }
  SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();

  SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];

  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);

  // int start = taosGetTimestampUs();
  transSendAsync(thrd->asyncPool, &(cliMsg->q));

  tsem_t* pSem = pCtx->pSem;
  tsem_wait(pSem);
  tsem_destroy(pSem);
  free(pSem);

  return;
}
dengyihao's avatar
dengyihao 已提交
667
#endif