transCli.c 20.1 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

#include "transComm.h"

20
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
dengyihao's avatar
dengyihao 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23 24
typedef struct SCliConn {
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
25
  uv_write_t*  writeReq;
dengyihao's avatar
dengyihao 已提交
26
  void*        hostThrd;
dengyihao's avatar
dengyihao 已提交
27
  SConnBuffer  readBuf;
dengyihao's avatar
dengyihao 已提交
28 29
  void*        data;
  queue        conn;
dengyihao's avatar
dengyihao 已提交
30 31
  char         spi;
  char         secured;
dengyihao's avatar
dengyihao 已提交
32
  uint64_t     expireTime;
dengyihao's avatar
dengyihao 已提交
33
  int8_t       notifyCount;  // timers already notify to client
dengyihao's avatar
dengyihao 已提交
34
  int32_t      ref;
dengyihao's avatar
dengyihao 已提交
35 36

  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
37
} SCliConn;
dengyihao's avatar
dengyihao 已提交
38

dengyihao's avatar
dengyihao 已提交
39
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
40 41 42 43
  STransConnCtx* ctx;
  SRpcMsg        msg;
  queue          q;
  uint64_t       st;
dengyihao's avatar
dengyihao 已提交
44 45 46
} SCliMsg;

typedef struct SCliThrdObj {
dengyihao's avatar
dengyihao 已提交
47 48 49 50
  pthread_t  thread;
  uv_loop_t* loop;
  // uv_async_t*     cliAsync;  //
  SAsyncPool*     asyncPool;
dengyihao's avatar
dengyihao 已提交
51
  uv_timer_t*     timer;
dengyihao's avatar
dengyihao 已提交
52
  void*           pool;  // conn pool
dengyihao's avatar
dengyihao 已提交
53 54
  queue           msg;
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
55
  uint64_t        nextTimeout;  // next timeout
dengyihao's avatar
dengyihao 已提交
56
  void*           pTransInst;   //
dengyihao's avatar
dengyihao 已提交
57
  bool            quit;
dengyihao's avatar
dengyihao 已提交
58 59 60 61 62 63 64 65 66
} SCliThrdObj;

typedef struct SClientObj {
  char          label[TSDB_LABEL_LEN];
  int32_t       index;
  int           numOfThreads;
  SCliThrdObj** pThreadObj;
} SClientObj;

dengyihao's avatar
dengyihao 已提交
67 68 69 70
typedef struct SConnList {
  queue conn;
} SConnList;

dengyihao's avatar
dengyihao 已提交
71
// conn pool
dengyihao's avatar
dengyihao 已提交
72
// add expire timeout and capacity limit
73 74
static void*     creatConnPool(int size);
static void*     destroyConnPool(void* pool);
dengyihao's avatar
dengyihao 已提交
75 76
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void      addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
77

dengyihao's avatar
dengyihao 已提交
78 79
// register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
80 81 82
// check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf);
// alloc buf for read
dengyihao's avatar
dengyihao 已提交
83
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
84
// callback after read nbytes from socket
dengyihao's avatar
dengyihao 已提交
85
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
86
// callback after write data to socket
dengyihao's avatar
dengyihao 已提交
87
static void clientWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
88
// callback after conn  to server
dengyihao's avatar
dengyihao 已提交
89
static void clientConnCb(uv_connect_t* req, int status);
dengyihao's avatar
dengyihao 已提交
90
static void clientAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
91
static void clientDestroy(uv_handle_t* handle);
92
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
dengyihao's avatar
dengyihao 已提交
93

dengyihao's avatar
dengyihao 已提交
94 95 96 97
// process data read from server, auth/decompress etc later
static void clientHandleResp(SCliConn* conn);
// handle except about conn
static void clientHandleExcept(SCliConn* conn);
98 99
// handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
dengyihao's avatar
dengyihao 已提交
100 101
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientSendQuit(SCliThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
102

dengyihao's avatar
dengyihao 已提交
103 104 105 106
static void destroyUserdata(SRpcMsg* userdata);

static void destroyCmsg(SCliMsg* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
107 108 109 110
// thread obj
static SCliThrdObj* createThrdObj();
static void         destroyThrdObj(SCliThrdObj* pThrd);
// thread
dengyihao's avatar
dengyihao 已提交
111 112
static void* clientThread(void* arg);

113
static void clientHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
114 115
  SCliMsg*       pMsg = conn->data;
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
116
  SRpcInfo*      pRpc = pCtx->pTransInst;
dengyihao's avatar
dengyihao 已提交
117

dengyihao's avatar
dengyihao 已提交
118 119 120 121
  STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);

dengyihao's avatar
dengyihao 已提交
122
  SRpcMsg rpcMsg;
dengyihao's avatar
dengyihao 已提交
123
  rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
124
  rpcMsg.pCont = transContFromHead((char*)pHead);
dengyihao's avatar
dengyihao 已提交
125 126
  rpcMsg.code = pHead->code;
  rpcMsg.msgType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
127
  rpcMsg.ahandle = pCtx->ahandle;
dengyihao's avatar
dengyihao 已提交
128

dengyihao's avatar
dengyihao 已提交
129 130
  tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr),
         ntohs(conn->addr.sin_port));
dengyihao's avatar
dengyihao 已提交
131
  if (pCtx->pSem == NULL) {
dengyihao's avatar
dengyihao 已提交
132
    tTrace("client conn(sync) %p handle resp", conn);
dengyihao's avatar
dengyihao 已提交
133
    (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
dengyihao's avatar
dengyihao 已提交
134
  } else {
dengyihao's avatar
dengyihao 已提交
135
    tTrace("client conn(sync) %p handle resp", conn);
dengyihao's avatar
dengyihao 已提交
136 137 138
    memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
    tsem_post(pCtx->pSem);
  }
dengyihao's avatar
dengyihao 已提交
139
  conn->notifyCount += 1;
dengyihao's avatar
dengyihao 已提交
140

dengyihao's avatar
dengyihao 已提交
141
  // buf's mem alread translated to rpcMsg.pCont
dengyihao's avatar
dengyihao 已提交
142 143
  transClearBuffer(&conn->readBuf);

dengyihao's avatar
dengyihao 已提交
144
  uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
dengyihao's avatar
dengyihao 已提交
145 146

  SCliThrdObj* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
147
  addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
dengyihao's avatar
dengyihao 已提交
148

dengyihao's avatar
dengyihao 已提交
149 150
  destroyCmsg(pMsg);
  conn->data = NULL;
dengyihao's avatar
dengyihao 已提交
151
  // start thread's timer of conn pool if not active
dengyihao's avatar
dengyihao 已提交
152 153
  if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
    uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
154
  }
dengyihao's avatar
dengyihao 已提交
155 156
}
static void clientHandleExcept(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
157
  if (pConn->data == NULL) {
dengyihao's avatar
dengyihao 已提交
158
    // handle conn except in conn pool
dengyihao's avatar
dengyihao 已提交
159 160 161
    clientConnDestroy(pConn, true);
    return;
  }
dengyihao's avatar
dengyihao 已提交
162
  tTrace("client conn %p start to destroy", pConn);
dengyihao's avatar
dengyihao 已提交
163
  SCliMsg* pMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
164 165

  destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
166 167 168

  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
169
  SRpcMsg rpcMsg = {0};
dengyihao's avatar
dengyihao 已提交
170
  rpcMsg.ahandle = pCtx->ahandle;
dengyihao's avatar
dengyihao 已提交
171
  rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
dengyihao's avatar
dengyihao 已提交
172 173
  if (pCtx->pSem == NULL) {
    // SRpcInfo* pRpc = pMsg->ctx->pRpc;
dengyihao's avatar
dengyihao 已提交
174
    (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
dengyihao's avatar
dengyihao 已提交
175 176 177 178 179
  } else {
    memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
    // SRpcMsg rpcMsg
    tsem_post(pCtx->pSem);
  }
dengyihao's avatar
dengyihao 已提交
180 181 182 183

  destroyCmsg(pMsg);
  pConn->data = NULL;
  // transDestroyConnCtx(pCtx);
dengyihao's avatar
dengyihao 已提交
184
  clientConnDestroy(pConn, true);
dengyihao's avatar
dengyihao 已提交
185
  pConn->notifyCount += 1;
dengyihao's avatar
dengyihao 已提交
186
}
dengyihao's avatar
dengyihao 已提交
187

dengyihao's avatar
dengyihao 已提交
188 189
static void clientTimeoutCb(uv_timer_t* handle) {
  SCliThrdObj* pThrd = handle->data;
dengyihao's avatar
dengyihao 已提交
190
  SRpcInfo*    pRpc = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
191
  int64_t      currentTime = pThrd->nextTimeout;
dengyihao's avatar
dengyihao 已提交
192
  tTrace("client conn timeout, try to remove expire conn from conn pool");
dengyihao's avatar
dengyihao 已提交
193

dengyihao's avatar
dengyihao 已提交
194
  SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
dengyihao's avatar
dengyihao 已提交
195 196 197 198 199 200
  while (p != NULL) {
    while (!QUEUE_IS_EMPTY(&p->conn)) {
      queue*    h = QUEUE_HEAD(&p->conn);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
      if (c->expireTime < currentTime) {
        QUEUE_REMOVE(h);
201 202 203
        // uv_stream_t stm = *(c->stream);
        // uv_close((uv_handle_t*)&stm, clientDestroy);
        clientConnDestroy(c, true);
dengyihao's avatar
dengyihao 已提交
204 205 206 207
      } else {
        break;
      }
    }
dengyihao's avatar
dengyihao 已提交
208
    p = taosHashIterate((SHashObj*)pThrd->pool, p);
dengyihao's avatar
dengyihao 已提交
209 210
  }

dengyihao's avatar
dengyihao 已提交
211 212
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
  uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
213
}
214 215 216
static void* creatConnPool(int size) {
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
217
}
218
static void* destroyConnPool(void* pool) {
dengyihao's avatar
dengyihao 已提交
219
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
220 221 222 223 224
  while (connList != NULL) {
    while (!QUEUE_IS_EMPTY(&connList->conn)) {
      queue* h = QUEUE_HEAD(&connList->conn);
      QUEUE_REMOVE(h);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
225
      clientConnDestroy(c, true);
dengyihao's avatar
dengyihao 已提交
226
    }
dengyihao's avatar
dengyihao 已提交
227
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
228
  }
dengyihao's avatar
dengyihao 已提交
229
  taosHashClear(pool);
dengyihao's avatar
dengyihao 已提交
230 231
}

dengyihao's avatar
dengyihao 已提交
232
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
dengyihao's avatar
dengyihao 已提交
233 234 235 236
  char key[128] = {0};
  tstrncpy(key, ip, strlen(ip));
  tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));

dengyihao's avatar
dengyihao 已提交
237 238
  SHashObj*  pPool = pool;
  SConnList* plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
239 240
  if (plist == NULL) {
    SConnList list;
dengyihao's avatar
dengyihao 已提交
241 242
    taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
243
    QUEUE_INIT(&plist->conn);
dengyihao's avatar
dengyihao 已提交
244 245 246 247 248 249 250
  }

  if (QUEUE_IS_EMPTY(&plist->conn)) {
    return NULL;
  }
  queue* h = QUEUE_HEAD(&plist->conn);
  QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
251 252 253 254

  SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
  QUEUE_INIT(&conn->conn);
  return conn;
dengyihao's avatar
dengyihao 已提交
255
}
dengyihao's avatar
dengyihao 已提交
256
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
257
  char key[128] = {0};
dengyihao's avatar
dengyihao 已提交
258

dengyihao's avatar
dengyihao 已提交
259 260
  tstrncpy(key, ip, strlen(ip));
  tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
dengyihao's avatar
dengyihao 已提交
261
  tTrace("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
dengyihao's avatar
dengyihao 已提交
262

dengyihao's avatar
dengyihao 已提交
263
  SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
dengyihao's avatar
dengyihao 已提交
264

dengyihao's avatar
dengyihao 已提交
265
  conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
dengyihao's avatar
dengyihao 已提交
266
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
267
  conn->notifyCount = 0;
dengyihao's avatar
dengyihao 已提交
268 269 270 271
  // list already create before
  assert(plist != NULL);
  QUEUE_PUSH(&plist->conn, &conn->conn);
}
dengyihao's avatar
dengyihao 已提交
272 273 274 275 276 277 278 279 280
static bool clientReadComplete(SConnBuffer* data) {
  STransMsgHead head;
  int32_t       headLen = sizeof(head);
  if (data->len >= headLen) {
    memcpy((char*)&head, data->buf, headLen);
    int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
    if (msgLen > data->len) {
      data->left = msgLen - data->len;
      return false;
dengyihao's avatar
dengyihao 已提交
281 282
    } else if (msgLen == data->len) {
      data->left = 0;
dengyihao's avatar
dengyihao 已提交
283 284 285 286 287 288
      return true;
    }
  } else {
    return false;
  }
}
dengyihao's avatar
dengyihao 已提交
289
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
290 291
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
292
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
293 294
}
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
295
  // impl later
dengyihao's avatar
dengyihao 已提交
296 297
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
298
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
299 300
    pBuf->len += nread;
    if (clientReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
301
      uv_read_stop((uv_stream_t*)conn->stream);
dengyihao's avatar
dengyihao 已提交
302
      tTrace("client conn %p read complete", conn);
303
      clientHandleResp(conn);
dengyihao's avatar
dengyihao 已提交
304
    } else {
dengyihao's avatar
dengyihao 已提交
305
      tTrace("client conn %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
306
    }
dengyihao's avatar
dengyihao 已提交
307 308
    return;
  }
309 310
  assert(nread <= 0);
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
311 312 313
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
314 315
    return;
  }
dengyihao's avatar
dengyihao 已提交
316
  if (nread < 0 || nread == UV_EOF) {
dengyihao's avatar
dengyihao 已提交
317
    tError("client conn %p read error: %s", conn, uv_err_name(nread));
dengyihao's avatar
dengyihao 已提交
318
    clientHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
319
  }
320 321
  // tDebug("Read error %s\n", uv_err_name(nread));
  // uv_close((uv_handle_t*)handle, clientDestroy);
dengyihao's avatar
dengyihao 已提交
322
}
dengyihao's avatar
dengyihao 已提交
323

324
static void clientConnDestroy(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
325
  //
dengyihao's avatar
dengyihao 已提交
326 327
  conn->ref--;
  if (conn->ref == 0) {
dengyihao's avatar
dengyihao 已提交
328
    tTrace("client conn %p remove from conn pool", conn);
dengyihao's avatar
dengyihao 已提交
329 330 331 332
    QUEUE_REMOVE(&conn->conn);
    if (clear) {
      uv_close((uv_handle_t*)conn->stream, clientDestroy);
    }
333
  }
dengyihao's avatar
dengyihao 已提交
334
}
dengyihao's avatar
dengyihao 已提交
335 336
static void clientDestroy(uv_handle_t* handle) {
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
337
  // transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
338 339 340

  free(conn->stream);
  free(conn->writeReq);
dengyihao's avatar
dengyihao 已提交
341
  tTrace("client conn %p destroy successfully", conn);
dengyihao's avatar
dengyihao 已提交
342 343 344
  free(conn);

  // clientConnDestroy(conn, false);
dengyihao's avatar
dengyihao 已提交
345 346 347 348 349
}

static void clientWriteCb(uv_write_t* req, int status) {
  SCliConn* pConn = req->data;
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
350
    tTrace("client conn %p data already was written out", pConn);
dengyihao's avatar
dengyihao 已提交
351
    SCliMsg* pMsg = pConn->data;
dengyihao's avatar
dengyihao 已提交
352
    if (pMsg == NULL) {
dengyihao's avatar
dengyihao 已提交
353 354
      // handle
      return;
dengyihao's avatar
dengyihao 已提交
355
    }
dengyihao's avatar
dengyihao 已提交
356
    destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
357
  } else {
dengyihao's avatar
dengyihao 已提交
358
    tError("client conn %p failed to write: %s", pConn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
359
    clientHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
360 361
    return;
  }
dengyihao's avatar
dengyihao 已提交
362
  SCliThrdObj* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
363
  uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
dengyihao's avatar
dengyihao 已提交
364
}
dengyihao's avatar
dengyihao 已提交
365 366

static void clientWrite(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
367 368 369 370
  SCliMsg*       pCliMsg = pConn->data;
  SRpcMsg*       pMsg = (SRpcMsg*)(&pCliMsg->msg);
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);

dengyihao's avatar
dengyihao 已提交
371
  int msgLen = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
372

dengyihao's avatar
dengyihao 已提交
373 374 375 376
  pHead->msgType = pMsg->msgType;
  pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);

  uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
377 378
  tDebug("client conn %p %s is send to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr),
         ntohs(pConn->addr.sin_port));
dengyihao's avatar
dengyihao 已提交
379 380
  uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
}
dengyihao's avatar
dengyihao 已提交
381 382
static void clientConnCb(uv_connect_t* req, int status) {
  // impl later
dengyihao's avatar
dengyihao 已提交
383 384
  SCliConn* pConn = req->data;
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
385
    // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
386
    tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
387
    clientHandleExcept(pConn);
388
    return;
dengyihao's avatar
dengyihao 已提交
389
  }
dengyihao's avatar
dengyihao 已提交
390 391 392 393
  int addrlen = sizeof(pConn->addr);
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen);

  tTrace("client conn %p create", pConn);
dengyihao's avatar
dengyihao 已提交
394

dengyihao's avatar
dengyihao 已提交
395
  assert(pConn->stream == req->handle);
dengyihao's avatar
dengyihao 已提交
396
  clientWrite(pConn);
dengyihao's avatar
dengyihao 已提交
397 398
}

dengyihao's avatar
dengyihao 已提交
399
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
400
  tDebug("client work thread %p start to quit", pThrd);
dengyihao's avatar
dengyihao 已提交
401
  destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
402
  // transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL);
dengyihao's avatar
dengyihao 已提交
403 404 405 406 407
  uv_timer_stop(pThrd->timer);
  pThrd->quit = true;
  // uv__async_stop(pThrd->cliAsync);
  uv_stop(pThrd->loop);
}
dengyihao's avatar
dengyihao 已提交
408
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
409 410
  uint64_t et = taosGetTimestampUs();
  uint64_t el = et - pMsg->st;
dengyihao's avatar
dengyihao 已提交
411
  tTrace("client msg tran time cost: %" PRIu64 "", el);
dengyihao's avatar
dengyihao 已提交
412
  et = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
413

dengyihao's avatar
dengyihao 已提交
414
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
415
  SCliConn*      conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
dengyihao's avatar
dengyihao 已提交
416
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
417
    // impl later
dengyihao's avatar
dengyihao 已提交
418
    tTrace("client get conn %p from pool", conn);
dengyihao's avatar
dengyihao 已提交
419 420
    conn->data = pMsg;
    conn->writeReq->data = conn;
dengyihao's avatar
dengyihao 已提交
421
    transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
422 423 424 425 426

    if (pThrd->quit) {
      clientHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
427
    clientWrite(conn);
dengyihao's avatar
dengyihao 已提交
428

dengyihao's avatar
dengyihao 已提交
429
  } else {
dengyihao's avatar
dengyihao 已提交
430
    SCliConn* conn = calloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
431
    conn->ref++;
dengyihao's avatar
dengyihao 已提交
432
    // read/write stream handle
dengyihao's avatar
dengyihao 已提交
433 434
    conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
    uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
dengyihao's avatar
dengyihao 已提交
435 436 437
    conn->stream->data = conn;

    // write req handle
dengyihao's avatar
dengyihao 已提交
438
    conn->writeReq = malloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
439
    conn->writeReq->data = conn;
dengyihao's avatar
dengyihao 已提交
440

dengyihao's avatar
dengyihao 已提交
441
    QUEUE_INIT(&conn->conn);
dengyihao's avatar
dengyihao 已提交
442 443 444

    conn->connReq.data = conn;
    conn->data = pMsg;
dengyihao's avatar
dengyihao 已提交
445
    conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
446

dengyihao's avatar
dengyihao 已提交
447
    struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
448
    uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
dengyihao's avatar
dengyihao 已提交
449
    // handle error in callback if fail to connect
dengyihao's avatar
dengyihao 已提交
450
    uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
dengyihao's avatar
dengyihao 已提交
451 452 453
  }
}
static void clientAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
454 455
  SAsyncItem*  item = handle->data;
  SCliThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
456 457
  SCliMsg*     pMsg = NULL;
  queue        wq;
dengyihao's avatar
dengyihao 已提交
458

dengyihao's avatar
dengyihao 已提交
459
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
dengyihao 已提交
460 461 462
  pthread_mutex_lock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  pthread_mutex_unlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
463

dengyihao's avatar
dengyihao 已提交
464 465 466 467
  int count = 0;
  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
468 469

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
470 471 472 473 474 475
    if (pMsg->ctx == NULL) {
      clientHandleQuit(pMsg, pThrd);
    } else {
      clientHandleReq(pMsg, pThrd);
    }
    // clientHandleReq(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
476
    count++;
dengyihao's avatar
dengyihao 已提交
477 478
  }
  if (count >= 2) {
dengyihao's avatar
dengyihao 已提交
479
    tTrace("client process batch size: %d", count);
dengyihao's avatar
dengyihao 已提交
480
  }
dengyihao's avatar
dengyihao 已提交
481 482 483 484
}

static void* clientThread(void* arg) {
  SCliThrdObj* pThrd = (SCliThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
485
  setThreadName("trans-client-work");
dengyihao's avatar
dengyihao 已提交
486 487 488 489 490
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
}

void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
  SClientObj* cli = calloc(1, sizeof(SClientObj));
dengyihao's avatar
dengyihao 已提交
491

dengyihao's avatar
dengyihao 已提交
492
  SRpcInfo* pRpc = shandle;
dengyihao's avatar
dengyihao 已提交
493 494 495 496 497
  memcpy(cli->label, label, strlen(label));
  cli->numOfThreads = numOfThreads;
  cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
498
    SCliThrdObj* pThrd = createThrdObj();
dengyihao's avatar
dengyihao 已提交
499
    pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
dengyihao's avatar
dengyihao 已提交
500
    pThrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
501

dengyihao's avatar
dengyihao 已提交
502
    int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
503
    if (err == 0) {
dengyihao's avatar
dengyihao 已提交
504
      tDebug("success to create tranport-client thread %d", i);
dengyihao's avatar
dengyihao 已提交
505
    }
dengyihao's avatar
dengyihao 已提交
506
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
507 508 509
  }
  return cli;
}
dengyihao's avatar
dengyihao 已提交
510 511 512 513 514 515 516 517 518 519 520 521 522 523

static void destroyUserdata(SRpcMsg* userdata) {
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
static void destroyCmsg(SCliMsg* pMsg) {
  if (pMsg == NULL) {
    return;
  }
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
dengyihao's avatar
dengyihao 已提交
524 525
  free(pMsg);
}
dengyihao's avatar
dengyihao 已提交
526

dengyihao's avatar
dengyihao 已提交
527 528 529 530 531 532 533 534
static SCliThrdObj* createThrdObj() {
  SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);

  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  uv_loop_init(pThrd->loop);

dengyihao's avatar
dengyihao 已提交
535
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, clientAsyncCb);
dengyihao's avatar
dengyihao 已提交
536

dengyihao's avatar
dengyihao 已提交
537 538 539
  pThrd->timer = malloc(sizeof(uv_timer_t));
  uv_timer_init(pThrd->loop, pThrd->timer);
  pThrd->timer->data = pThrd;
dengyihao's avatar
dengyihao 已提交
540

dengyihao's avatar
dengyihao 已提交
541
  pThrd->pool = creatConnPool(4);
dengyihao's avatar
dengyihao 已提交
542 543

  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
544 545
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
546
static void destroyThrdObj(SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
547 548 549
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
550
  uv_stop(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
551 552
  pthread_join(pThrd->thread, NULL);
  pthread_mutex_destroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
553 554
  transDestroyAsyncPool(pThrd->asyncPool);
  // free(pThrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
555
  free(pThrd->timer);
dengyihao's avatar
dengyihao 已提交
556 557 558
  free(pThrd->loop);
  free(pThrd);
}
dengyihao's avatar
dengyihao 已提交
559

dengyihao's avatar
dengyihao 已提交
560
static void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
561 562 563 564 565
  if (ctx != NULL) {
    free(ctx->ip);
  }
  free(ctx);
}
dengyihao's avatar
dengyihao 已提交
566
//
dengyihao's avatar
dengyihao 已提交
567 568 569 570 571
static void clientSendQuit(SCliThrdObj* thrd) {
  // cli can stop gracefully
  SCliMsg* msg = calloc(1, sizeof(SCliMsg));
  msg->ctx = NULL;  //

dengyihao's avatar
dengyihao 已提交
572 573 574
  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &msg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
575

dengyihao's avatar
dengyihao 已提交
576
  transSendAsync(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
577
  // uv_async_send(thrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
578
}
dengyihao's avatar
dengyihao 已提交
579 580 581
void taosCloseClient(void* arg) {
  // impl later
  SClientObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
582
  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
583
    clientSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
584
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
585 586 587
  }
  free(cli->pThreadObj);
  free(cli);
dengyihao's avatar
dengyihao 已提交
588
}
dengyihao's avatar
dengyihao 已提交
589 590
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
  // impl later
dengyihao's avatar
dengyihao 已提交
591 592
  char*    ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
  uint32_t port = pEpSet->eps[pEpSet->inUse].port;
dengyihao's avatar
dengyihao 已提交
593

dengyihao's avatar
dengyihao 已提交
594 595
  SRpcInfo* pRpc = (SRpcInfo*)shandle;

dengyihao's avatar
dengyihao 已提交
596 597 598 599
  int32_t flen = 0;
  if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
    // imp later
  }
dengyihao's avatar
dengyihao 已提交
600

dengyihao's avatar
dengyihao 已提交
601 602
  STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));

dengyihao's avatar
dengyihao 已提交
603
  pCtx->pTransInst = (SRpcInfo*)shandle;
dengyihao's avatar
dengyihao 已提交
604 605 606 607
  pCtx->ahandle = pMsg->ahandle;
  pCtx->msgType = pMsg->msgType;
  pCtx->ip = strdup(ip);
  pCtx->port = port;
dengyihao's avatar
dengyihao 已提交
608 609 610 611 612 613 614

  assert(pRpc->connType == TAOS_CONN_CLIENT);
  // atomic or not
  int64_t index = pRpc->index;
  if (pRpc->index++ >= pRpc->numOfThreads) {
    pRpc->index = 0;
  }
dengyihao's avatar
dengyihao 已提交
615 616 617 618
  SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pMsg;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
619 620 621

  SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];

dengyihao's avatar
dengyihao 已提交
622 623 624
  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
625

dengyihao's avatar
dengyihao 已提交
626 627
  // int start = taosGetTimestampUs();
  transSendAsync(thrd->asyncPool, &(cliMsg->q));
dengyihao's avatar
dengyihao 已提交
628
  // uv_async_send(thrd->cliAsync);
dengyihao's avatar
dengyihao 已提交
629
  // int end = taosGetTimestampUs() - start;
dengyihao's avatar
dengyihao 已提交
630
  // tError("client sent to rpc, time cost: %d", (int)end);
dengyihao's avatar
dengyihao 已提交
631
}
dengyihao's avatar
dengyihao 已提交
632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
  char*    ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
  uint32_t port = pEpSet->eps[pEpSet->inUse].port;

  SRpcInfo* pRpc = (SRpcInfo*)shandle;

  STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
  pCtx->pTransInst = (SRpcInfo*)shandle;
  pCtx->ahandle = pReq->ahandle;
  pCtx->msgType = pReq->msgType;
  pCtx->ip = strdup(ip);
  pCtx->port = port;
  pCtx->pSem = calloc(1, sizeof(tsem_t));
  pCtx->pRsp = pRsp;
  tsem_init(pCtx->pSem, 0, 0);

  int64_t index = pRpc->index;
  if (pRpc->index++ >= pRpc->numOfThreads) {
    pRpc->index = 0;
  }
  SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();

  SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];

  // pthread_mutex_lock(&thrd->msgMtx);
  // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
  // pthread_mutex_unlock(&thrd->msgMtx);

  // int start = taosGetTimestampUs();
  transSendAsync(thrd->asyncPool, &(cliMsg->q));

  tsem_t* pSem = pCtx->pSem;
  tsem_wait(pSem);
  tsem_destroy(pSem);
  free(pSem);

  return;
}
dengyihao's avatar
dengyihao 已提交
673
#endif