transSrv.c 20.5 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
21 22 23 24
  uv_tcp_t*   pTcp;
  uv_write_t* pWriter;
  uv_timer_t* pTimer;

dengyihao's avatar
dengyihao 已提交
25
  // uv_async_t* pWorkerAsync;
dengyihao's avatar
dengyihao 已提交
26 27
  queue       queue;
  int         ref;
dengyihao's avatar
dengyihao 已提交
28
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
29
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
30 31 32
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
33
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
34
  void*       pSrvMsg;
dengyihao's avatar
dengyihao 已提交
35

dengyihao's avatar
dengyihao 已提交
36
  // SRpcMsg sendMsg;
dengyihao's avatar
dengyihao 已提交
37 38 39 40 41 42 43
  // del later
  char secured;
  int  spi;
  char info[64];
  char user[TSDB_UNI_LEN];  // user ID for the link
  char secret[TSDB_PASSWORD_LEN];
  char ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
44 45 46 47 48 49 50
} SSrvConn;

typedef struct SSrvMsg {
  SSrvConn* pConn;
  SRpcMsg   msg;
  queue     q;
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
51 52

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
53 54 55 56 57 58
  pthread_t   thread;
  uv_pipe_t*  pipe;
  int         fd;
  uv_loop_t*  loop;
  SAsyncPool* asyncPool;
  // uv_async_t*     workerAsync;  //
dengyihao's avatar
dengyihao 已提交
59 60
  queue           msg;
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
61
  void*           pTransInst;
dengyihao's avatar
dengyihao 已提交
62 63 64 65 66 67 68 69 70 71 72 73
} SWorkThrdObj;

typedef struct SServerObj {
  pthread_t      thread;
  uv_tcp_t       server;
  uv_loop_t*     loop;
  int            workerIdx;
  int            numOfThreads;
  SWorkThrdObj** pThreadObj;
  uv_pipe_t**    pipe;
  uint32_t       ip;
  uint32_t       port;
dengyihao's avatar
dengyihao 已提交
74
  uv_async_t*    pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
75 76 77 78 79
} SServerObj;

static const char* notify = "a";

// refactor later
dengyihao's avatar
dengyihao 已提交
80
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
81

dengyihao's avatar
dengyihao 已提交
82
static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
83 84 85 86 87 88

static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void uvOnTimeoutCb(uv_timer_t* handle);
static void uvOnWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
89
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
90 91 92
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
93
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
94

dengyihao's avatar
dengyihao 已提交
95 96
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
97

dengyihao's avatar
dengyihao 已提交
98
static void destroySmsg(SSrvMsg* smsg);
99
// check whether already read complete packet
dengyihao's avatar
dengyihao 已提交
100 101 102
static bool      readComplete(SConnBuffer* buf);
static SSrvConn* createConn();
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
103

104
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
105

dengyihao's avatar
dengyihao 已提交
106
// server and worker thread
dengyihao's avatar
dengyihao 已提交
107 108 109
static void* workerThread(void* arg);
static void* acceptThread(void* arg);

dengyihao's avatar
dengyihao 已提交
110 111 112 113
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
114 115 116
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  /*
   * formate of data buffer:
dengyihao's avatar
dengyihao 已提交
117 118
   * |<--------------------------data from socket------------------------------->|
   * |<------STransMsgHead------->|<-------------------other data--------------->|
dengyihao's avatar
dengyihao 已提交
119
   */
dengyihao's avatar
dengyihao 已提交
120 121 122
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
123 124 125 126 127 128
}

// check data read from socket completely or not
//
static bool readComplete(SConnBuffer* data) {
  // TODO(yihao): handle pipeline later
dengyihao's avatar
dengyihao 已提交
129 130
  STransMsgHead head;
  int32_t       headLen = sizeof(head);
dengyihao's avatar
dengyihao 已提交
131
  if (data->len >= headLen) {
dengyihao's avatar
dengyihao 已提交
132 133
    memcpy((char*)&head, data->buf, headLen);
    int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
dengyihao's avatar
dengyihao 已提交
134 135 136
    if (msgLen > data->len) {
      data->left = msgLen - data->len;
      return false;
dengyihao's avatar
dengyihao 已提交
137
    } else if (msgLen == data->len) {
dengyihao's avatar
dengyihao 已提交
138
      return true;
dengyihao's avatar
dengyihao 已提交
139 140 141
    } else if (msgLen < data->len) {
      return false;
      // handle other packet later
dengyihao's avatar
dengyihao 已提交
142 143 144 145 146 147
    }
  } else {
    return false;
  }
}

dengyihao's avatar
dengyihao 已提交
148 149 150 151
// static void uvDoProcess(SRecvInfo* pRecv) {
//  // impl later
//  STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
//  SRpcInfo*      pRpc = (SRpcInfo*)pRecv->shandle;
dengyihao's avatar
dengyihao 已提交
152
//  SSrvConn*         pConn = pRecv->thandle;
dengyihao's avatar
dengyihao 已提交
153 154 155 156 157 158
//  tDump(pRecv->msg, pRecv->msgLen);
//  terrno = 0;
//  // SRpcReqContext* pContest;
//
//  // do auth and check
//}
dengyihao's avatar
dengyihao 已提交
159

dengyihao's avatar
dengyihao 已提交
160
static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
dengyihao's avatar
dengyihao 已提交
161 162 163
  STransMsgHead* pHead = (STransMsgHead*)msg;

  int code = 0;
dengyihao's avatar
dengyihao 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214

  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
    // secured link, or no authentication
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
    // tTrace("%s, secured link, no auth is required", pConn->info);
    return 0;
  }

  if (!rpcIsReq(pHead->msgType)) {
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
        code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
        code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
      return 0;
    }
  }

  code = 0;
  if (pHead->spi == pConn->spi) {
    // authentication
    SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest));

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
    } else {
      if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
        // tDebug("%s, authentication failed, msg discarded", pConn->info);
        code = TSDB_CODE_RPC_AUTH_FAILURE;
      } else {
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
        if (!rpcIsReq(pHead->msgType)) pConn->secured = 1;  // link is secured for client
        // tTrace("%s, message is authenticated", pConn->info);
      }
    }
  } else {
    tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
  }

  return code;
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
215
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
216
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
217 218
}

dengyihao's avatar
dengyihao 已提交
219
static void uvHandleReq(SSrvConn* pConn) {
dengyihao's avatar
dengyihao 已提交
220 221
  SRecvInfo    info;
  SRecvInfo*   p = &info;
dengyihao's avatar
dengyihao 已提交
222
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
223
  p->msg = pBuf->buf;
dengyihao's avatar
dengyihao 已提交
224 225 226
  p->msgLen = pBuf->len;
  p->ip = 0;
  p->port = 0;
dengyihao's avatar
dengyihao 已提交
227
  p->shandle = pConn->pTransInst;  //
dengyihao's avatar
dengyihao 已提交
228 229 230
  p->thandle = pConn;
  p->chandle = NULL;

dengyihao's avatar
dengyihao 已提交
231
  STransMsgHead* pHead = (STransMsgHead*)p->msg;
dengyihao's avatar
dengyihao 已提交
232 233

  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
234
  assert(transIsReq(pHead->msgType));
dengyihao's avatar
dengyihao 已提交
235 236 237

  SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
  // auth here
dengyihao's avatar
dengyihao 已提交
238
  // auth should not do in rpc thread
dengyihao's avatar
dengyihao 已提交
239

dengyihao's avatar
dengyihao 已提交
240 241 242 243 244
  // int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
  // if (code != 0) {
  //  terrno = code;
  //  return;
  //}
dengyihao's avatar
dengyihao 已提交
245 246
  pHead->code = htonl(pHead->code);

dengyihao's avatar
dengyihao 已提交
247 248 249 250 251
  int32_t dlen = 0;
  if (transDecompressMsg(NULL, 0, NULL)) {
    // add compress later
    // pHead = rpcDecompressRpcMsg(pHead);
  } else {
dengyihao's avatar
dengyihao 已提交
252
    pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
253
    // impl later
dengyihao's avatar
dengyihao 已提交
254
    //
dengyihao's avatar
dengyihao 已提交
255
  }
dengyihao's avatar
dengyihao 已提交
256 257

  SRpcMsg rpcMsg;
dengyihao's avatar
dengyihao 已提交
258
  rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
259 260 261
  rpcMsg.pCont = pHead->content;
  rpcMsg.msgType = pHead->msgType;
  rpcMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
262
  rpcMsg.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
263 264
  rpcMsg.handle = pConn;

dengyihao's avatar
dengyihao 已提交
265
  transClearBuffer(&pConn->readBuf);
266
  pConn->ref++;
dengyihao's avatar
dengyihao 已提交
267
  (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
dengyihao's avatar
dengyihao 已提交
268
  // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
269 270 271 272 273 274
  // auth
  // validate msg type
}

void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
  // opt
dengyihao's avatar
dengyihao 已提交
275 276
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
277 278
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
279
    tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
280
    if (readComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
281 282
      tDebug("conn %p alread read complete packet", conn);
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
283
    } else {
dengyihao's avatar
dengyihao 已提交
284
      tDebug("conn %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
285 286 287
    }
    return;
  }
288 289 290
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
291 292 293 294
  if (nread < 0 || nread != UV_EOF) {
    if (conn->ref > 1) {
      conn->ref++;  // ref > 1 signed that write is in progress
    }
dengyihao's avatar
dengyihao 已提交
295 296
    tDebug("conn %p read error: %s", conn, uv_err_name(nread));
    destroyConn(conn, true);
dengyihao's avatar
dengyihao 已提交
297 298 299 300 301 302 303 304 305
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->base = malloc(sizeof(char));
  buf->len = 2;
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
306
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
307
  tDebug("conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
308 309 310
}

void uvOnWriteCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
311
  SSrvConn* conn = req->data;
dengyihao's avatar
dengyihao 已提交
312 313 314 315

  SSrvMsg* smsg = conn->pSrvMsg;
  destroySmsg(smsg);
  conn->pSrvMsg = NULL;
dengyihao's avatar
dengyihao 已提交
316

dengyihao's avatar
dengyihao 已提交
317
  transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
318
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
319
    tDebug("conn %p data already was written on stream", conn);
dengyihao's avatar
dengyihao 已提交
320
  } else {
dengyihao's avatar
dengyihao 已提交
321
    tDebug("conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
322
    //
323
    destroyConn(conn, true);
dengyihao's avatar
dengyihao 已提交
324 325 326
  }
  // opt
}
dengyihao's avatar
dengyihao 已提交
327 328 329 330 331 332 333
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
    tDebug("success to dispatch conn to work thread");
  } else {
    tError("fail to dispatch conn to work thread");
  }
}
dengyihao's avatar
dengyihao 已提交
334

dengyihao's avatar
dengyihao 已提交
335
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
336
  // impl later;
dengyihao's avatar
dengyihao 已提交
337 338
  tDebug("conn %p prepare to send resp", smsg->pConn);
  SRpcMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
339 340 341 342 343
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
344
  pHead->msgType = smsg->pConn->inType + 1;
dengyihao's avatar
dengyihao 已提交
345 346 347 348 349 350 351 352 353 354
  // add more info
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
  if (transCompressMsg(msg, len, NULL)) {
    // impl later
  }
  pHead->msgLen = htonl(len);
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
  uv_timer_stop(pConn->pTimer);

  pConn->pSrvMsg = smsg;
  // conn->pWriter->data = smsg;
  uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);

  // SRpcMsg* rpcMsg = smsg->msg;

  return;
}
dengyihao's avatar
dengyihao 已提交
371
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
372 373 374 375
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
dengyihao's avatar
dengyihao 已提交
376
  free(smsg);
dengyihao's avatar
dengyihao 已提交
377
}
dengyihao's avatar
dengyihao 已提交
378
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
379
  SWorkThrdObj* pThrd = handle->data;
dengyihao's avatar
dengyihao 已提交
380
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
381 382
  queue         wq;
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
dengyihao 已提交
383 384 385
  pthread_mutex_lock(&pThrd->msgMtx);
  QUEUE_MOVE(&pThrd->msg, &wq);
  pthread_mutex_unlock(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
386 387 388 389

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
390 391 392 393 394

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
      tError("except occurred, continue");
      continue;
dengyihao's avatar
dengyihao 已提交
395
    }
dengyihao's avatar
dengyihao 已提交
396 397 398 399 400 401 402
    if (msg->pConn == NULL) {
      //
      free(msg);
      uv_stop(pThrd->loop);
    } else {
      uvStartSendResp(msg);
    }
dengyihao's avatar
dengyihao 已提交
403 404 405
    // uv_buf_t wb;
    // uvPrepareSendData(msg, &wb);
    // uv_timer_stop(conn->pTimer);
dengyihao's avatar
dengyihao 已提交
406

dengyihao's avatar
dengyihao 已提交
407
    // uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
dengyihao's avatar
dengyihao 已提交
408 409
  }
}
dengyihao's avatar
dengyihao 已提交
410 411 412 413
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
  uv_stop(srv->loop);
}
dengyihao's avatar
dengyihao 已提交
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429

void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

  uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
    uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));

    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
430

dengyihao's avatar
dengyihao 已提交
431
    tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
432
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
433 434
  } else {
    uv_close((uv_handle_t*)cli, NULL);
435
    free(cli);
dengyihao's avatar
dengyihao 已提交
436 437 438 439 440 441 442 443 444
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
  tDebug("connection coming");
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
445
    // uv_close((uv_handle_t*)q, NULL);
dengyihao's avatar
dengyihao 已提交
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
  free(buf->base);

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
dengyihao 已提交
464
  SSrvConn* pConn = createConn();
465

dengyihao's avatar
dengyihao 已提交
466
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
467 468 469 470 471 472
  /* init conn timer*/
  pConn->pTimer = malloc(sizeof(uv_timer_t));
  uv_timer_init(pThrd->loop, pConn->pTimer);
  pConn->pTimer->data = pConn;

  pConn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
473
  // pConn->pWorkerAsync = pThrd->workerAsync;  // thread safty
dengyihao's avatar
dengyihao 已提交
474 475 476 477 478 479 480 481 482 483 484 485 486

  // init client handle
  pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

  // init write request, just
  pConn->pWriter = calloc(1, sizeof(uv_write_t));
  pConn->pWriter->data = pConn;

  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
487
    tDebug("conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
488 489
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
  } else {
dengyihao's avatar
dengyihao 已提交
490
    tDebug("failed to create new connection");
491
    destroyConn(pConn, true);
dengyihao's avatar
dengyihao 已提交
492 493 494 495 496 497 498 499
  }
}

void* acceptThread(void* arg) {
  // opt
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
}
dengyihao's avatar
dengyihao 已提交
500 501
static bool addHandleToWorkloop(void* arg) {
  SWorkThrdObj* pThrd = arg;
dengyihao's avatar
dengyihao 已提交
502
  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
503 504 505
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
506 507

  // SRpcInfo* pRpc = pThrd->shandle;
dengyihao's avatar
dengyihao 已提交
508
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
dengyihao's avatar
dengyihao 已提交
509 510 511 512
  uv_pipe_open(pThrd->pipe, pThrd->fd);

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
513 514
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
515

dengyihao's avatar
dengyihao 已提交
516
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, uvWorkerAsyncCb);
dengyihao's avatar
dengyihao 已提交
517
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
518 519 520 521 522 523 524 525 526 527 528 529 530
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
531 532 533 534
  // register an async here to quit server gracefully
  srv->pAcceptAsync = calloc(1, sizeof(uv_async_t));
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
535

dengyihao's avatar
dengyihao 已提交
536
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
537 538 539 540 541 542 543 544 545 546
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
  if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) {
    tError("failed to listen: %s", uv_err_name(err));
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
547 548 549
}
void* workerThread(void* arg) {
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
550 551 552
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
}

dengyihao's avatar
dengyihao 已提交
553 554 555
static SSrvConn* createConn() {
  SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
  tDebug("conn %p created", pConn);
556
  ++pConn->ref;
dengyihao's avatar
dengyihao 已提交
557 558
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
559

dengyihao's avatar
dengyihao 已提交
560
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
561 562 563
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
564 565
  tDebug("conn %p try to destroy", conn);
  if (--conn->ref > 0) {
566 567
    return;
  }
dengyihao's avatar
dengyihao 已提交
568
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
569 570
  destroySmsg(conn->pSrvMsg);
  conn->pSrvMsg = NULL;
dengyihao's avatar
dengyihao 已提交
571

572
  if (clear) {
dengyihao's avatar
dengyihao 已提交
573
    uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
574
  }
dengyihao's avatar
dengyihao 已提交
575 576 577 578
}
static void uvDestroyConn(uv_handle_t* handle) {
  SSrvConn* conn = handle->data;
  tDebug("conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
579 580
  uv_timer_stop(conn->pTimer);
  free(conn->pTimer);
dengyihao's avatar
dengyihao 已提交
581
  // free(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
582
  free(conn->pWriter);
583
  free(conn);
dengyihao's avatar
dengyihao 已提交
584
}
dengyihao's avatar
dengyihao 已提交
585
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
dengyihao's avatar
dengyihao 已提交
586
  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
587 588 589 590

  if (pConn->spi && pConn->secured == 0) {
    // add auth part
    pHead->spi = pConn->spi;
dengyihao's avatar
dengyihao 已提交
591
    STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen);
dengyihao's avatar
dengyihao 已提交
592 593 594
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
dengyihao's avatar
dengyihao 已提交
595 596
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
dengyihao's avatar
dengyihao 已提交
597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
  } else {
    pHead->spi = 0;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
  }

  return msgLen;
}

void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
  SServerObj* srv = calloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
  srv->pThreadObj = (SWorkThrdObj**)calloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*));
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

  for (int i = 0; i < srv->numOfThreads; i++) {
    SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
618
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
619

dengyihao's avatar
dengyihao 已提交
620 621 622
    srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
    int fds[2];
    if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
dengyihao's avatar
dengyihao 已提交
623
      goto End;
dengyihao's avatar
dengyihao 已提交
624 625 626 627
    }
    uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

dengyihao's avatar
dengyihao 已提交
628
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
629 630
    thrd->fd = fds[0];
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
631

dengyihao's avatar
dengyihao 已提交
632 633 634
    if (false == addHandleToWorkloop(thrd)) {
      goto End;
    }
dengyihao's avatar
dengyihao 已提交
635 636 637 638 639 640 641 642 643
    int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
    }
  }
dengyihao's avatar
dengyihao 已提交
644 645 646
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
dengyihao's avatar
dengyihao 已提交
647 648 649 650 651 652 653 654
  int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
    // clear all resource later
  }

  return srv;
dengyihao's avatar
dengyihao 已提交
655 656 657 658 659 660 661 662 663 664 665
End:
  taosCloseServer(srv);
  return NULL;
}

void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
  pthread_join(pThrd->thread, NULL);
  free(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
666 667 668
  transDestroyAsyncPool(pThrd->asyncPool);

  // free(pThrd->workerAsync);
dengyihao's avatar
dengyihao 已提交
669
  free(pThrd);
dengyihao's avatar
dengyihao 已提交
670
}
dengyihao's avatar
dengyihao 已提交
671 672 673 674 675 676 677 678
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));

  pthread_mutex_lock(&pThrd->msgMtx);
  QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
  pthread_mutex_unlock(&pThrd->msgMtx);
  tDebug("send quit msg to work thread");

dengyihao's avatar
dengyihao 已提交
679 680
  transSendAsync(pThrd->asyncPool);
  // uv_async_send(pThrd->workerAsync);
dengyihao's avatar
dengyihao 已提交
681 682
}

dengyihao's avatar
dengyihao 已提交
683 684 685
void taosCloseServer(void* arg) {
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
686
  for (int i = 0; i < srv->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
687
    sendQuitToWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
688
    destroyWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
689
  }
dengyihao's avatar
dengyihao 已提交
690 691 692 693 694 695 696

  tDebug("send quit msg to accept thread");
  uv_async_send(srv->pAcceptAsync);
  pthread_join(srv->thread, NULL);

  free(srv->pThreadObj);
  free(srv->pAcceptAsync);
dengyihao's avatar
dengyihao 已提交
697
  free(srv->loop);
dengyihao's avatar
dengyihao 已提交
698 699 700 701

  for (int i = 0; i < srv->numOfThreads; i++) {
    free(srv->pipe[i]);
  }
dengyihao's avatar
dengyihao 已提交
702
  free(srv->pipe);
dengyihao's avatar
dengyihao 已提交
703

dengyihao's avatar
dengyihao 已提交
704
  free(srv);
dengyihao's avatar
dengyihao 已提交
705
}
dengyihao's avatar
dengyihao 已提交
706 707

void rpcSendResponse(const SRpcMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
708
  SSrvConn*     pConn = pMsg->handle;
dengyihao's avatar
dengyihao 已提交
709 710
  SWorkThrdObj* pThrd = pConn->hostThrd;

dengyihao's avatar
dengyihao 已提交
711 712 713 714 715 716 717 718
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  srvMsg->pConn = pConn;
  srvMsg->msg = *pMsg;

  pthread_mutex_lock(&pThrd->msgMtx);
  QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
  pthread_mutex_unlock(&pThrd->msgMtx);

dengyihao's avatar
dengyihao 已提交
719
  tDebug("conn %p start to send resp", pConn);
dengyihao's avatar
dengyihao 已提交
720 721
  transSendAsync(pThrd->asyncPool);
  // uv_async_send(pThrd->workerAsync);
dengyihao's avatar
dengyihao 已提交
722 723 724
}

#endif