transCli.c 21.4 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

#include "transComm.h"

20
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
dengyihao's avatar
dengyihao 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23 24
typedef struct SCliConn {
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
25
  uv_write_t*  writeReq;
dengyihao's avatar
dengyihao 已提交
26
  void*        hostThrd;
dengyihao's avatar
dengyihao 已提交
27
  SConnBuffer  readBuf;
dengyihao's avatar
dengyihao 已提交
28 29
  void*        data;
  queue        conn;
dengyihao's avatar
dengyihao 已提交
30
  uint64_t     expireTime;
dengyihao's avatar
dengyihao 已提交
31
  int8_t       notifyCount;  // timers already notify to client
dengyihao's avatar
dengyihao 已提交
32

dengyihao's avatar
dengyihao 已提交
33 34 35 36 37 38 39
  SRpcPush* push;
  int       persist;  //
  // spi configure
  char    spi;
  char    secured;
  int32_t ref;
  // debug and log info
dengyihao's avatar
dengyihao 已提交
40
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
41
} SCliConn;
dengyihao's avatar
dengyihao 已提交
42

dengyihao's avatar
dengyihao 已提交
43
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
44 45 46 47
  STransConnCtx* ctx;
  SRpcMsg        msg;
  queue          q;
  uint64_t       st;
dengyihao's avatar
dengyihao 已提交
48 49 50
} SCliMsg;

typedef struct SCliThrdObj {
dengyihao's avatar
dengyihao 已提交
51 52 53 54
  pthread_t  thread;
  uv_loop_t* loop;
  // uv_async_t*     cliAsync;  //
  SAsyncPool*     asyncPool;
dengyihao's avatar
dengyihao 已提交
55
  uv_timer_t*     timer;
dengyihao's avatar
dengyihao 已提交
56
  void*           pool;  // conn pool
dengyihao's avatar
dengyihao 已提交
57 58
  queue           msg;
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
59
  uint64_t        nextTimeout;  // next timeout
dengyihao's avatar
dengyihao 已提交
60
  void*           pTransInst;   //
dengyihao's avatar
dengyihao 已提交
61
  bool            quit;
dengyihao's avatar
dengyihao 已提交
62 63 64 65 66 67 68 69 70
} SCliThrdObj;

typedef struct SClientObj {
  char          label[TSDB_LABEL_LEN];
  int32_t       index;
  int           numOfThreads;
  SCliThrdObj** pThreadObj;
} SClientObj;

dengyihao's avatar
dengyihao 已提交
71 72 73 74
typedef struct SConnList {
  queue conn;
} SConnList;

dengyihao's avatar
dengyihao 已提交
75
// conn pool
dengyihao's avatar
dengyihao 已提交
76
// add expire timeout and capacity limit
77 78
static void*     creatConnPool(int size);
static void*     destroyConnPool(void* pool);
dengyihao's avatar
dengyihao 已提交
79 80
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void      addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
81

dengyihao's avatar
dengyihao 已提交
82 83
// register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
84 85 86
// check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf);
// alloc buf for read
dengyihao's avatar
dengyihao 已提交
87
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
88
// callback after read nbytes from socket
dengyihao's avatar
dengyihao 已提交
89
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
90
// callback after write data to socket
dengyihao's avatar
dengyihao 已提交
91
static void clientWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
92
// callback after conn  to server
dengyihao's avatar
dengyihao 已提交
93
static void clientConnCb(uv_connect_t* req, int status);
dengyihao's avatar
dengyihao 已提交
94
static void clientAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
95
static void clientDestroy(uv_handle_t* handle);
96
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
dengyihao's avatar
dengyihao 已提交
97

dengyihao's avatar
dengyihao 已提交
98 99 100 101
// process data read from server, auth/decompress etc later
static void clientHandleResp(SCliConn* conn);
// handle except about conn
static void clientHandleExcept(SCliConn* conn);
102 103
// handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
dengyihao's avatar
dengyihao 已提交
104 105
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientSendQuit(SCliThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
106

dengyihao's avatar
dengyihao 已提交
107 108 109 110
static void destroyUserdata(SRpcMsg* userdata);

static void destroyCmsg(SCliMsg* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
111 112 113 114
// thread obj
static SCliThrdObj* createThrdObj();
static void         destroyThrdObj(SCliThrdObj* pThrd);
// thread
dengyihao's avatar
dengyihao 已提交
115 116
static void* clientThread(void* arg);

117
static void clientHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
118 119
  SCliMsg*       pMsg = conn->data;
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
120
  SRpcInfo*      pRpc = pCtx->pTransInst;
dengyihao's avatar
dengyihao 已提交
121

dengyihao's avatar
dengyihao 已提交
122 123 124 125
  STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);

dengyihao's avatar
dengyihao 已提交
126
  SRpcMsg rpcMsg;
dengyihao's avatar
dengyihao 已提交
127
  rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
128
  rpcMsg.pCont = transContFromHead((char*)pHead);
dengyihao's avatar
dengyihao 已提交
129 130
  rpcMsg.code = pHead->code;
  rpcMsg.msgType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
131
  rpcMsg.ahandle = pCtx->ahandle;
dengyihao's avatar
dengyihao 已提交
132

dengyihao's avatar
dengyihao 已提交
133 134
  tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr),
         ntohs(conn->addr.sin_port));
dengyihao's avatar
dengyihao 已提交
135

dengyihao's avatar
dengyihao 已提交
136
  if (conn->push != NULL && conn->notifyCount != 0) {
dengyihao's avatar
dengyihao 已提交
137
    (*conn->push->callback)(conn->push->arg, &rpcMsg);
dengyihao's avatar
dengyihao 已提交
138
    conn->push = NULL;
dengyihao's avatar
dengyihao 已提交
139
  } else {
dengyihao's avatar
dengyihao 已提交
140
    if (pCtx->pSem == NULL) {
dengyihao's avatar
dengyihao 已提交
141
      tTrace("client conn%p handle resp", conn);
dengyihao's avatar
dengyihao 已提交
142 143 144 145 146 147
      (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
    } else {
      tTrace("client conn(sync) %p handle resp", conn);
      memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
      tsem_post(pCtx->pSem);
    }
dengyihao's avatar
dengyihao 已提交
148
  }
dengyihao's avatar
dengyihao 已提交
149
  conn->notifyCount += 1;
dengyihao's avatar
dengyihao 已提交
150
  conn->secured = pHead->secured;
dengyihao's avatar
dengyihao 已提交
151

dengyihao's avatar
dengyihao 已提交
152
  // buf's mem alread translated to rpcMsg.pCont
dengyihao's avatar
dengyihao 已提交
153 154
  transClearBuffer(&conn->readBuf);

dengyihao's avatar
dengyihao 已提交
155
  uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
dengyihao's avatar
dengyihao 已提交
156 157

  SCliThrdObj* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
158
  // user owns conn->persist = 1
dengyihao's avatar
dengyihao 已提交
159
  if (conn->push == NULL) {
dengyihao's avatar
dengyihao 已提交
160
    addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
dengyihao's avatar
dengyihao 已提交
161

dengyihao's avatar
dengyihao 已提交
162 163 164
    destroyCmsg(conn->data);
    conn->data = NULL;
  }
dengyihao's avatar
dengyihao 已提交
165
  // start thread's timer of conn pool if not active
dengyihao's avatar
dengyihao 已提交
166 167
  if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
    uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
168
  }
dengyihao's avatar
dengyihao 已提交
169 170
}
static void clientHandleExcept(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
171
  if (pConn->data == NULL && pConn->push == NULL) {
dengyihao's avatar
dengyihao 已提交
172
    // handle conn except in conn pool
dengyihao's avatar
dengyihao 已提交
173 174 175
    clientConnDestroy(pConn, true);
    return;
  }
dengyihao's avatar
dengyihao 已提交
176
  tTrace("client conn %p start to destroy", pConn);
dengyihao's avatar
dengyihao 已提交
177
  SCliMsg* pMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
178

dengyihao's avatar
dengyihao 已提交
179 180
  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
181
  SRpcMsg rpcMsg = {0};
dengyihao's avatar
dengyihao 已提交
182
  rpcMsg.ahandle = pCtx->ahandle;
dengyihao's avatar
dengyihao 已提交
183
  rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
dengyihao's avatar
dengyihao 已提交
184

dengyihao's avatar
dengyihao 已提交
185
  if (pConn->push != NULL && pConn->notifyCount != 0) {
dengyihao's avatar
dengyihao 已提交
186
    (*pConn->push->callback)(pConn->push->arg, &rpcMsg);
dengyihao's avatar
dengyihao 已提交
187
    pConn->push = NULL;
dengyihao's avatar
dengyihao 已提交
188
  } else {
dengyihao's avatar
dengyihao 已提交
189 190 191 192 193 194
    if (pCtx->pSem == NULL) {
      (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
    } else {
      memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
      tsem_post(pCtx->pSem);
    }
dengyihao's avatar
dengyihao 已提交
195 196 197
    if (pConn->push != NULL) {
      (*pConn->push->callback)(pConn->push->arg, &rpcMsg);
    }
dengyihao's avatar
dengyihao 已提交
198 199 200 201 202
    pConn->push = NULL;
  }
  if (pConn->push == NULL) {
    destroyCmsg(pConn->data);
    pConn->data = NULL;
dengyihao's avatar
dengyihao 已提交
203
  }
dengyihao's avatar
dengyihao 已提交
204
  // transDestroyConnCtx(pCtx);
dengyihao's avatar
dengyihao 已提交
205
  clientConnDestroy(pConn, true);
dengyihao's avatar
dengyihao 已提交
206
  pConn->notifyCount += 1;
dengyihao's avatar
dengyihao 已提交
207
}
dengyihao's avatar
dengyihao 已提交
208

dengyihao's avatar
dengyihao 已提交
209 210
static void clientTimeoutCb(uv_timer_t* handle) {
  SCliThrdObj* pThrd = handle->data;
dengyihao's avatar
dengyihao 已提交
211
  SRpcInfo*    pRpc = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
212
  int64_t      currentTime = pThrd->nextTimeout;
dengyihao's avatar
dengyihao 已提交
213
  tTrace("client conn timeout, try to remove expire conn from conn pool");
dengyihao's avatar
dengyihao 已提交
214

dengyihao's avatar
dengyihao 已提交
215
  SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
dengyihao's avatar
dengyihao 已提交
216 217 218 219 220 221
  while (p != NULL) {
    while (!QUEUE_IS_EMPTY(&p->conn)) {
      queue*    h = QUEUE_HEAD(&p->conn);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
      if (c->expireTime < currentTime) {
        QUEUE_REMOVE(h);
222 223 224
        // uv_stream_t stm = *(c->stream);
        // uv_close((uv_handle_t*)&stm, clientDestroy);
        clientConnDestroy(c, true);
dengyihao's avatar
dengyihao 已提交
225 226 227 228
      } else {
        break;
      }
    }
dengyihao's avatar
dengyihao 已提交
229
    p = taosHashIterate((SHashObj*)pThrd->pool, p);
dengyihao's avatar
dengyihao 已提交
230 231
  }

dengyihao's avatar
dengyihao 已提交
232 233
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
  uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
234
}
235 236 237
static void* creatConnPool(int size) {
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
238
}
239
static void* destroyConnPool(void* pool) {
dengyihao's avatar
dengyihao 已提交
240
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
241 242 243 244 245
  while (connList != NULL) {
    while (!QUEUE_IS_EMPTY(&connList->conn)) {
      queue* h = QUEUE_HEAD(&connList->conn);
      QUEUE_REMOVE(h);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
246
      clientConnDestroy(c, true);
dengyihao's avatar
dengyihao 已提交
247
    }
dengyihao's avatar
dengyihao 已提交
248
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
249
  }
dengyihao's avatar
dengyihao 已提交
250
  taosHashClear(pool);
dengyihao's avatar
dengyihao 已提交
251 252
}

dengyihao's avatar
dengyihao 已提交
253
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
dengyihao's avatar
dengyihao 已提交
254 255 256 257
  char key[128] = {0};
  tstrncpy(key, ip, strlen(ip));
  tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));

dengyihao's avatar
dengyihao 已提交
258 259
  SHashObj*  pPool = pool;
  SConnList* plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
260 261
  if (plist == NULL) {
    SConnList list;
dengyihao's avatar
dengyihao 已提交
262 263
    taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
264
    QUEUE_INIT(&plist->conn);
dengyihao's avatar
dengyihao 已提交
265 266 267 268 269 270 271
  }

  if (QUEUE_IS_EMPTY(&plist->conn)) {
    return NULL;
  }
  queue* h = QUEUE_HEAD(&plist->conn);
  QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
272 273 274 275

  SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
  QUEUE_INIT(&conn->conn);
  return conn;
dengyihao's avatar
dengyihao 已提交
276
}
dengyihao's avatar
dengyihao 已提交
277
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
278
  char key[128] = {0};
dengyihao's avatar
dengyihao 已提交
279

dengyihao's avatar
dengyihao 已提交
280 281
  tstrncpy(key, ip, strlen(ip));
  tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
dengyihao's avatar
dengyihao 已提交
282
  tTrace("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
dengyihao's avatar
dengyihao 已提交
283

dengyihao's avatar
dengyihao 已提交
284
  SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
dengyihao's avatar
dengyihao 已提交
285

dengyihao's avatar
dengyihao 已提交
286
  conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
dengyihao's avatar
dengyihao 已提交
287
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
288
  conn->notifyCount = 0;
dengyihao's avatar
dengyihao 已提交
289 290 291 292
  // list already create before
  assert(plist != NULL);
  QUEUE_PUSH(&plist->conn, &conn->conn);
}
dengyihao's avatar
dengyihao 已提交
293 294 295 296 297 298 299 300 301
static bool clientReadComplete(SConnBuffer* data) {
  STransMsgHead head;
  int32_t       headLen = sizeof(head);
  if (data->len >= headLen) {
    memcpy((char*)&head, data->buf, headLen);
    int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
    if (msgLen > data->len) {
      data->left = msgLen - data->len;
      return false;
dengyihao's avatar
dengyihao 已提交
302 303
    } else if (msgLen == data->len) {
      data->left = 0;
dengyihao's avatar
dengyihao 已提交
304 305 306 307 308 309
      return true;
    }
  } else {
    return false;
  }
}
dengyihao's avatar
dengyihao 已提交
310
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
311 312
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
313
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
314 315
}
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
316
  // impl later
dengyihao's avatar
dengyihao 已提交
317 318
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
319
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
320 321
    pBuf->len += nread;
    if (clientReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
322
      uv_read_stop((uv_stream_t*)conn->stream);
dengyihao's avatar
dengyihao 已提交
323
      tTrace("client conn %p read complete", conn);
324
      clientHandleResp(conn);
dengyihao's avatar
dengyihao 已提交
325
    } else {
dengyihao's avatar
dengyihao 已提交
326
      tTrace("client conn %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
327
    }
dengyihao's avatar
dengyihao 已提交
328 329
    return;
  }
330 331
  assert(nread <= 0);
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
332 333 334
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
335 336
    return;
  }
dengyihao's avatar
dengyihao 已提交
337
  if (nread < 0 || nread == UV_EOF) {
dengyihao's avatar
dengyihao 已提交
338
    tError("client conn %p read error: %s", conn, uv_err_name(nread));
dengyihao's avatar
dengyihao 已提交
339
    clientHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
340
  }
341 342
  // tDebug("Read error %s\n", uv_err_name(nread));
  // uv_close((uv_handle_t*)handle, clientDestroy);
dengyihao's avatar
dengyihao 已提交
343
}
dengyihao's avatar
dengyihao 已提交
344

345
static void clientConnDestroy(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
346
  //
dengyihao's avatar
dengyihao 已提交
347 348
  conn->ref--;
  if (conn->ref == 0) {
dengyihao's avatar
dengyihao 已提交
349
    tTrace("client conn %p remove from conn pool", conn);
dengyihao's avatar
dengyihao 已提交
350 351 352 353
    QUEUE_REMOVE(&conn->conn);
    if (clear) {
      uv_close((uv_handle_t*)conn->stream, clientDestroy);
    }
354
  }
dengyihao's avatar
dengyihao 已提交
355
}
dengyihao's avatar
dengyihao 已提交
356 357
static void clientDestroy(uv_handle_t* handle) {
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
358
  // transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
359 360 361

  free(conn->stream);
  free(conn->writeReq);
dengyihao's avatar
dengyihao 已提交
362
  tTrace("client conn %p destroy successfully", conn);
dengyihao's avatar
dengyihao 已提交
363 364 365
  free(conn);

  // clientConnDestroy(conn, false);
dengyihao's avatar
dengyihao 已提交
366 367 368 369 370
}

static void clientWriteCb(uv_write_t* req, int status) {
  SCliConn* pConn = req->data;
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
371
    tTrace("client conn %p data already was written out", pConn);
dengyihao's avatar
dengyihao 已提交
372
    SCliMsg* pMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
373
    if (pMsg == NULL) {
dengyihao's avatar
dengyihao 已提交
374 375
      // handle
      return;
dengyihao's avatar
dengyihao 已提交
376
    }
dengyihao's avatar
dengyihao 已提交
377
    destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
378
  } else {
dengyihao's avatar
dengyihao 已提交
379
    tError("client conn %p failed to write: %s", pConn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
380
    clientHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
381 382
    return;
  }
dengyihao's avatar
dengyihao 已提交
383
  SCliThrdObj* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
384
  uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
dengyihao's avatar
dengyihao 已提交
385
}
dengyihao's avatar
dengyihao 已提交
386 387

static void clientWrite(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
388
  SCliMsg*       pCliMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
389
  STransConnCtx* pCtx = pCliMsg->ctx;
dengyihao's avatar
dengyihao 已提交
390
  SRpcInfo*      pTransInst = pCtx->pTransInst;
dengyihao's avatar
dengyihao 已提交
391 392 393

  SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);

dengyihao's avatar
dengyihao 已提交
394
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
395 396 397 398 399 400 401
  int            msgLen = transMsgLenFromCont(pMsg->contLen);

  if (!pConn->secured) {
    char* buf = calloc(1, msgLen + sizeof(STransUserMsg));
    memcpy(buf, (char*)pHead, msgLen);

    STransUserMsg* uMsg = (STransUserMsg*)(buf + msgLen);
dengyihao's avatar
dengyihao 已提交
402 403
    memcpy(uMsg->user, pTransInst->user, tListLen(uMsg->user));
    memcpy(uMsg->secret, pTransInst->secret, tListLen(uMsg->secret));
dengyihao's avatar
dengyihao 已提交
404

dengyihao's avatar
dengyihao 已提交
405 406 407 408 409 410 411
    // to avoid mem leak
    destroyUserdata(pMsg);

    pMsg->pCont = (char*)buf + sizeof(STransMsgHead);
    pMsg->contLen = msgLen + sizeof(STransUserMsg) - sizeof(STransMsgHead);

    pHead = (STransMsgHead*)buf;
dengyihao's avatar
dengyihao 已提交
412
    pHead->secured = 1;
dengyihao's avatar
dengyihao 已提交
413 414
    msgLen += sizeof(STransUserMsg);
  }
dengyihao's avatar
dengyihao 已提交
415

dengyihao's avatar
dengyihao 已提交
416 417 418 419
  pHead->msgType = pMsg->msgType;
  pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);

  uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
420 421
  tDebug("client conn %p %s is send to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr),
         ntohs(pConn->addr.sin_port));
dengyihao's avatar
dengyihao 已提交
422 423
  uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
}
dengyihao's avatar
dengyihao 已提交
424 425
static void clientConnCb(uv_connect_t* req, int status) {
  // impl later
dengyihao's avatar
dengyihao 已提交
426 427
  SCliConn* pConn = req->data;
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
428
    // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
429
    tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
430
    clientHandleExcept(pConn);
431
    return;
dengyihao's avatar
dengyihao 已提交
432
  }
dengyihao's avatar
dengyihao 已提交
433 434 435 436
  int addrlen = sizeof(pConn->addr);
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen);

  tTrace("client conn %p create", pConn);
dengyihao's avatar
dengyihao 已提交
437

dengyihao's avatar
dengyihao 已提交
438
  assert(pConn->stream == req->handle);
dengyihao's avatar
dengyihao 已提交
439
  clientWrite(pConn);
dengyihao's avatar
dengyihao 已提交
440 441
}

dengyihao's avatar
dengyihao 已提交
442
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
443
  tDebug("client work thread %p start to quit", pThrd);
dengyihao's avatar
dengyihao 已提交
444
  destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
445
  // transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL);
dengyihao's avatar
dengyihao 已提交
446 447 448 449 450
  uv_timer_stop(pThrd->timer);
  pThrd->quit = true;
  // uv__async_stop(pThrd->cliAsync);
  uv_stop(pThrd->loop);
}
dengyihao's avatar
dengyihao 已提交
451
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
452 453
  uint64_t et = taosGetTimestampUs();
  uint64_t el = et - pMsg->st;
dengyihao's avatar
dengyihao 已提交
454
  tTrace("client msg tran time cost: %" PRIu64 "", el);
dengyihao's avatar
dengyihao 已提交
455
  et = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
456

dengyihao's avatar
dengyihao 已提交
457
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
458
  SCliConn*      conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
dengyihao's avatar
dengyihao 已提交
459
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
460
    // impl later
dengyihao's avatar
dengyihao 已提交
461
    tTrace("client get conn %p from pool", conn);
dengyihao's avatar
dengyihao 已提交
462 463
    conn->data = pMsg;
    conn->writeReq->data = conn;
dengyihao's avatar
dengyihao 已提交
464
    transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
465 466 467 468 469

    if (pThrd->quit) {
      clientHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
470
    clientWrite(conn);
dengyihao's avatar
dengyihao 已提交
471

dengyihao's avatar
dengyihao 已提交
472 473
    conn->push = pMsg->msg.push;

dengyihao's avatar
dengyihao 已提交
474
  } else {
dengyihao's avatar
dengyihao 已提交
475
    SCliConn* conn = calloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
476
    conn->ref++;
dengyihao's avatar
dengyihao 已提交
477
    // read/write stream handle
dengyihao's avatar
dengyihao 已提交
478 479
    conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
    uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
dengyihao's avatar
dengyihao 已提交
480 481 482
    conn->stream->data = conn;

    // write req handle
dengyihao's avatar
dengyihao 已提交
483
    conn->writeReq = malloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
484
    conn->writeReq->data = conn;
dengyihao's avatar
dengyihao 已提交
485

dengyihao's avatar
dengyihao 已提交
486
    QUEUE_INIT(&conn->conn);
dengyihao's avatar
dengyihao 已提交
487 488 489

    conn->connReq.data = conn;
    conn->data = pMsg;
dengyihao's avatar
dengyihao 已提交
490
    conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
491

dengyihao's avatar
dengyihao 已提交
492 493
    conn->push = pMsg->msg.push;

dengyihao's avatar
dengyihao 已提交
494
    struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
495
    uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
dengyihao's avatar
dengyihao 已提交
496
    // handle error in callback if fail to connect
dengyihao's avatar
dengyihao 已提交
497
    uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
dengyihao's avatar
dengyihao 已提交
498 499 500
  }
}
static void clientAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
501 502
  SAsyncItem*  item = handle->data;
  SCliThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
503 504
  SCliMsg*     pMsg = NULL;
  queue        wq;
dengyihao's avatar
dengyihao 已提交
505

dengyihao's avatar
dengyihao 已提交
506
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
dengyihao 已提交
507 508 509
  pthread_mutex_lock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  pthread_mutex_unlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
510

dengyihao's avatar
dengyihao 已提交
511 512 513 514
  int count = 0;
  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
515 516

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
517 518 519 520 521 522
    if (pMsg->ctx == NULL) {
      clientHandleQuit(pMsg, pThrd);
    } else {
      clientHandleReq(pMsg, pThrd);
    }
    // clientHandleReq(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
523
    count++;
dengyihao's avatar
dengyihao 已提交
524 525
  }
  if (count >= 2) {
dengyihao's avatar
dengyihao 已提交
526
    tTrace("client process batch size: %d", count);
dengyihao's avatar
dengyihao 已提交
527
  }
dengyihao's avatar
dengyihao 已提交
528 529 530 531
}

static void* clientThread(void* arg) {
  SCliThrdObj* pThrd = (SCliThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
532
  setThreadName("trans-client-work");
dengyihao's avatar
dengyihao 已提交
533 534 535 536 537
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
}

void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
  SClientObj* cli = calloc(1, sizeof(SClientObj));
dengyihao's avatar
dengyihao 已提交
538

dengyihao's avatar
dengyihao 已提交
539
  SRpcInfo* pRpc = shandle;
dengyihao's avatar
dengyihao 已提交
540 541 542 543 544
  memcpy(cli->label, label, strlen(label));
  cli->numOfThreads = numOfThreads;
  cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
545
    SCliThrdObj* pThrd = createThrdObj();
dengyihao's avatar
dengyihao 已提交
546
    pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
dengyihao's avatar
dengyihao 已提交
547
    pThrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
548

dengyihao's avatar
dengyihao 已提交
549
    int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
550
    if (err == 0) {
dengyihao's avatar
dengyihao 已提交
551
      tDebug("success to create tranport-client thread %d", i);
dengyihao's avatar
dengyihao 已提交
552
    }
dengyihao's avatar
dengyihao 已提交
553
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
554 555 556
  }
  return cli;
}
dengyihao's avatar
dengyihao 已提交
557 558 559 560 561 562 563 564 565 566 567 568 569 570

static void destroyUserdata(SRpcMsg* userdata) {
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
static void destroyCmsg(SCliMsg* pMsg) {
  if (pMsg == NULL) {
    return;
  }
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
571 572
  free(pMsg);
}
dengyihao's avatar
dengyihao 已提交
573

dengyihao's avatar
dengyihao 已提交
574 575 576 577 578 579 580 581
static SCliThrdObj* createThrdObj() {
  SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);

  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  uv_loop_init(pThrd->loop);

dengyihao's avatar
dengyihao 已提交
582
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, clientAsyncCb);
dengyihao's avatar
dengyihao 已提交
583

dengyihao's avatar
dengyihao 已提交
584 585 586
  pThrd->timer = malloc(sizeof(uv_timer_t));
  uv_timer_init(pThrd->loop, pThrd->timer);
  pThrd->timer->data = pThrd;
dengyihao's avatar
dengyihao 已提交
587

dengyihao's avatar
dengyihao 已提交
588
  pThrd->pool = creatConnPool(4);
dengyihao's avatar
dengyihao 已提交
589 590

  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
591 592
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
593
static void destroyThrdObj(SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
594 595 596
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
597
  uv_stop(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
598 599
  pthread_join(pThrd->thread, NULL);
  pthread_mutex_destroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
600 601
  transDestroyAsyncPool(pThrd->asyncPool);
  // free(pThrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
602
  free(pThrd->timer);
dengyihao's avatar
dengyihao 已提交
603 604 605
  free(pThrd->loop);
  free(pThrd);
}
dengyihao's avatar
dengyihao 已提交
606

dengyihao's avatar
dengyihao 已提交
607
static void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
608 609 610 611 612
  if (ctx != NULL) {
    free(ctx->ip);
  }
  free(ctx);
}
dengyihao's avatar
dengyihao 已提交
613
//
dengyihao's avatar
dengyihao 已提交
614 615 616 617 618
static void clientSendQuit(SCliThrdObj* thrd) {
  // cli can stop gracefully
  SCliMsg* msg = calloc(1, sizeof(SCliMsg));
  msg->ctx = NULL;  //

dengyihao's avatar
dengyihao 已提交
619 620 621
  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &msg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
622

dengyihao's avatar
dengyihao 已提交
623
  transSendAsync(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
624
  // uv_async_send(thrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
625
}
dengyihao's avatar
dengyihao 已提交
626 627 628
void taosCloseClient(void* arg) {
  // impl later
  SClientObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
629
  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
630
    clientSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
631
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
632 633 634
  }
  free(cli->pThreadObj);
  free(cli);
dengyihao's avatar
dengyihao 已提交
635
}
dengyihao's avatar
dengyihao 已提交
636 637
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
  // impl later
dengyihao's avatar
dengyihao 已提交
638 639
  char*    ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
  uint32_t port = pEpSet->eps[pEpSet->inUse].port;
dengyihao's avatar
dengyihao 已提交
640

dengyihao's avatar
dengyihao 已提交
641 642
  SRpcInfo* pRpc = (SRpcInfo*)shandle;

dengyihao's avatar
dengyihao 已提交
643 644 645 646
  int32_t flen = 0;
  if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
    // imp later
  }
dengyihao's avatar
dengyihao 已提交
647

dengyihao's avatar
dengyihao 已提交
648 649
  STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));

dengyihao's avatar
dengyihao 已提交
650
  pCtx->pTransInst = (SRpcInfo*)shandle;
dengyihao's avatar
dengyihao 已提交
651 652 653 654
  pCtx->ahandle = pMsg->ahandle;
  pCtx->msgType = pMsg->msgType;
  pCtx->ip = strdup(ip);
  pCtx->port = port;
dengyihao's avatar
dengyihao 已提交
655 656 657 658 659 660 661

  assert(pRpc->connType == TAOS_CONN_CLIENT);
  // atomic or not
  int64_t index = pRpc->index;
  if (pRpc->index++ >= pRpc->numOfThreads) {
    pRpc->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
662 663 664 665
  SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pMsg;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
666 667 668

  SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];

dengyihao's avatar
dengyihao 已提交
669 670 671
  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
672

dengyihao's avatar
dengyihao 已提交
673 674
  // int start = taosGetTimestampUs();
  transSendAsync(thrd->asyncPool, &(cliMsg->q));
dengyihao's avatar
dengyihao 已提交
675
  // uv_async_send(thrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
676
  // int end = taosGetTimestampUs() - start;
dengyihao's avatar
dengyihao 已提交
677
  // tError("client sent to rpc, time cost: %d", (int)end);
dengyihao's avatar
dengyihao 已提交
678
}
dengyihao's avatar
dengyihao 已提交
679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
  char*    ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
  uint32_t port = pEpSet->eps[pEpSet->inUse].port;

  SRpcInfo* pRpc = (SRpcInfo*)shandle;

  STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
  pCtx->pTransInst = (SRpcInfo*)shandle;
  pCtx->ahandle = pReq->ahandle;
  pCtx->msgType = pReq->msgType;
  pCtx->ip = strdup(ip);
  pCtx->port = port;
  pCtx->pSem = calloc(1, sizeof(tsem_t));
  pCtx->pRsp = pRsp;
  tsem_init(pCtx->pSem, 0, 0);

  int64_t index = pRpc->index;
  if (pRpc->index++ >= pRpc->numOfThreads) {
    pRpc->index = 0;
  }
  SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();

  SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];

  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);

  // int start = taosGetTimestampUs();
  transSendAsync(thrd->asyncPool, &(cliMsg->q));

  tsem_t* pSem = pCtx->pSem;
  tsem_wait(pSem);
  tsem_destroy(pSem);
  free(pSem);

  return;
}
dengyihao's avatar
dengyihao 已提交
720
#endif