transSrv.c 20.4 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
21 22 23 24 25 26 27
  uv_tcp_t*   pTcp;
  uv_write_t* pWriter;
  uv_timer_t* pTimer;

  uv_async_t* pWorkerAsync;
  queue       queue;
  int         ref;
dengyihao's avatar
dengyihao 已提交
28
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
29
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
30 31 32
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
33
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
34
  void*       pSrvMsg;
dengyihao's avatar
dengyihao 已提交
35

dengyihao's avatar
dengyihao 已提交
36
  // SRpcMsg sendMsg;
dengyihao's avatar
dengyihao 已提交
37 38 39 40 41 42 43
  // del later
  char secured;
  int  spi;
  char info[64];
  char user[TSDB_UNI_LEN];  // user ID for the link
  char secret[TSDB_PASSWORD_LEN];
  char ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
44 45 46 47 48 49 50
} SSrvConn;

typedef struct SSrvMsg {
  SSrvConn* pConn;
  SRpcMsg   msg;
  queue     q;
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
51 52 53 54 55 56 57

typedef struct SWorkThrdObj {
  pthread_t       thread;
  uv_pipe_t*      pipe;
  int             fd;
  uv_loop_t*      loop;
  uv_async_t*     workerAsync;  //
dengyihao's avatar
dengyihao 已提交
58 59
  queue           msg;
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
60
  void*           pTransInst;
dengyihao's avatar
dengyihao 已提交
61 62 63 64 65 66 67 68 69 70 71 72
} SWorkThrdObj;

typedef struct SServerObj {
  pthread_t      thread;
  uv_tcp_t       server;
  uv_loop_t*     loop;
  int            workerIdx;
  int            numOfThreads;
  SWorkThrdObj** pThreadObj;
  uv_pipe_t**    pipe;
  uint32_t       ip;
  uint32_t       port;
dengyihao's avatar
dengyihao 已提交
73
  uv_async_t*    pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
74 75 76 77 78
} SServerObj;

static const char* notify = "a";

// refactor later
dengyihao's avatar
dengyihao 已提交
79
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
80

dengyihao's avatar
dengyihao 已提交
81
static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
82 83 84 85 86 87

static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void uvOnTimeoutCb(uv_timer_t* handle);
static void uvOnWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
88
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
89 90 91
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
92
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
93

dengyihao's avatar
dengyihao 已提交
94 95
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
96

dengyihao's avatar
dengyihao 已提交
97
static void destroySmsg(SSrvMsg* smsg);
98
// check whether already read complete packet
dengyihao's avatar
dengyihao 已提交
99 100 101
static bool      readComplete(SConnBuffer* buf);
static SSrvConn* createConn();
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
102

103
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
104

dengyihao's avatar
dengyihao 已提交
105
// server and worker thread
dengyihao's avatar
dengyihao 已提交
106 107 108
static void* workerThread(void* arg);
static void* acceptThread(void* arg);

dengyihao's avatar
dengyihao 已提交
109 110 111 112
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
113 114 115
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  /*
   * formate of data buffer:
dengyihao's avatar
dengyihao 已提交
116 117
   * |<--------------------------data from socket------------------------------->|
   * |<------STransMsgHead------->|<-------------------other data--------------->|
dengyihao's avatar
dengyihao 已提交
118
   */
dengyihao's avatar
dengyihao 已提交
119 120 121
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
122 123 124 125 126 127
}

// check data read from socket completely or not
//
static bool readComplete(SConnBuffer* data) {
  // TODO(yihao): handle pipeline later
dengyihao's avatar
dengyihao 已提交
128 129
  STransMsgHead head;
  int32_t       headLen = sizeof(head);
dengyihao's avatar
dengyihao 已提交
130
  if (data->len >= headLen) {
dengyihao's avatar
dengyihao 已提交
131 132
    memcpy((char*)&head, data->buf, headLen);
    int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
dengyihao's avatar
dengyihao 已提交
133 134 135
    if (msgLen > data->len) {
      data->left = msgLen - data->len;
      return false;
dengyihao's avatar
dengyihao 已提交
136
    } else if (msgLen == data->len) {
dengyihao's avatar
dengyihao 已提交
137
      return true;
dengyihao's avatar
dengyihao 已提交
138 139 140
    } else if (msgLen < data->len) {
      return false;
      // handle other packet later
dengyihao's avatar
dengyihao 已提交
141 142 143 144 145 146
    }
  } else {
    return false;
  }
}

dengyihao's avatar
dengyihao 已提交
147 148 149 150
// static void uvDoProcess(SRecvInfo* pRecv) {
//  // impl later
//  STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
//  SRpcInfo*      pRpc = (SRpcInfo*)pRecv->shandle;
dengyihao's avatar
dengyihao 已提交
151
//  SSrvConn*         pConn = pRecv->thandle;
dengyihao's avatar
dengyihao 已提交
152 153 154 155 156 157
//  tDump(pRecv->msg, pRecv->msgLen);
//  terrno = 0;
//  // SRpcReqContext* pContest;
//
//  // do auth and check
//}
dengyihao's avatar
dengyihao 已提交
158

dengyihao's avatar
dengyihao 已提交
159
static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
dengyihao's avatar
dengyihao 已提交
160 161 162
  STransMsgHead* pHead = (STransMsgHead*)msg;

  int code = 0;
dengyihao's avatar
dengyihao 已提交
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213

  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
    // secured link, or no authentication
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
    // tTrace("%s, secured link, no auth is required", pConn->info);
    return 0;
  }

  if (!rpcIsReq(pHead->msgType)) {
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
        code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
        code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
      return 0;
    }
  }

  code = 0;
  if (pHead->spi == pConn->spi) {
    // authentication
    SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest));

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
    } else {
      if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
        // tDebug("%s, authentication failed, msg discarded", pConn->info);
        code = TSDB_CODE_RPC_AUTH_FAILURE;
      } else {
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
        if (!rpcIsReq(pHead->msgType)) pConn->secured = 1;  // link is secured for client
        // tTrace("%s, message is authenticated", pConn->info);
      }
    }
  } else {
    tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
  }

  return code;
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
214
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
215
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
216 217
}

dengyihao's avatar
dengyihao 已提交
218
static void uvHandleReq(SSrvConn* pConn) {
dengyihao's avatar
dengyihao 已提交
219 220
  SRecvInfo    info;
  SRecvInfo*   p = &info;
dengyihao's avatar
dengyihao 已提交
221
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
222
  p->msg = pBuf->buf;
dengyihao's avatar
dengyihao 已提交
223 224 225
  p->msgLen = pBuf->len;
  p->ip = 0;
  p->port = 0;
dengyihao's avatar
dengyihao 已提交
226
  p->shandle = pConn->pTransInst;  //
dengyihao's avatar
dengyihao 已提交
227 228 229
  p->thandle = pConn;
  p->chandle = NULL;

dengyihao's avatar
dengyihao 已提交
230
  STransMsgHead* pHead = (STransMsgHead*)p->msg;
dengyihao's avatar
dengyihao 已提交
231 232

  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
233
  assert(transIsReq(pHead->msgType));
dengyihao's avatar
dengyihao 已提交
234 235 236

  SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
  // auth here
dengyihao's avatar
dengyihao 已提交
237
  // auth should not do in rpc thread
dengyihao's avatar
dengyihao 已提交
238

dengyihao's avatar
dengyihao 已提交
239 240 241 242 243
  // int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
  // if (code != 0) {
  //  terrno = code;
  //  return;
  //}
dengyihao's avatar
dengyihao 已提交
244 245
  pHead->code = htonl(pHead->code);

dengyihao's avatar
dengyihao 已提交
246 247 248 249 250
  int32_t dlen = 0;
  if (transDecompressMsg(NULL, 0, NULL)) {
    // add compress later
    // pHead = rpcDecompressRpcMsg(pHead);
  } else {
dengyihao's avatar
dengyihao 已提交
251
    pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
252
    // impl later
dengyihao's avatar
dengyihao 已提交
253
    //
dengyihao's avatar
dengyihao 已提交
254
  }
dengyihao's avatar
dengyihao 已提交
255 256

  SRpcMsg rpcMsg;
dengyihao's avatar
dengyihao 已提交
257
  rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
258 259 260
  rpcMsg.pCont = pHead->content;
  rpcMsg.msgType = pHead->msgType;
  rpcMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
261
  rpcMsg.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
262 263
  rpcMsg.handle = pConn;

dengyihao's avatar
dengyihao 已提交
264
  transClearBuffer(&pConn->readBuf);
265
  pConn->ref++;
dengyihao's avatar
dengyihao 已提交
266
  (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
dengyihao's avatar
dengyihao 已提交
267
  // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
268 269 270 271 272 273
  // auth
  // validate msg type
}

void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
  // opt
dengyihao's avatar
dengyihao 已提交
274 275
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
276 277
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
278
    tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
279
    if (readComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
280 281
      tDebug("conn %p alread read complete packet", conn);
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
282
    } else {
dengyihao's avatar
dengyihao 已提交
283
      tDebug("conn %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
284 285 286
    }
    return;
  }
287 288 289
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
290 291 292 293
  if (nread < 0 || nread != UV_EOF) {
    if (conn->ref > 1) {
      conn->ref++;  // ref > 1 signed that write is in progress
    }
dengyihao's avatar
dengyihao 已提交
294 295
    tDebug("conn %p read error: %s", conn, uv_err_name(nread));
    destroyConn(conn, true);
dengyihao's avatar
dengyihao 已提交
296 297 298 299 300 301 302 303 304
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->base = malloc(sizeof(char));
  buf->len = 2;
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
305
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
306
  tDebug("conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
307 308 309
}

void uvOnWriteCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
310
  SSrvConn* conn = req->data;
dengyihao's avatar
dengyihao 已提交
311 312 313 314

  SSrvMsg* smsg = conn->pSrvMsg;
  destroySmsg(smsg);
  conn->pSrvMsg = NULL;
dengyihao's avatar
dengyihao 已提交
315

dengyihao's avatar
dengyihao 已提交
316
  transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
317
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
318
    tDebug("conn %p data already was written on stream", conn);
dengyihao's avatar
dengyihao 已提交
319
  } else {
dengyihao's avatar
dengyihao 已提交
320
    tDebug("conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
321
    //
322
    destroyConn(conn, true);
dengyihao's avatar
dengyihao 已提交
323 324 325
  }
  // opt
}
dengyihao's avatar
dengyihao 已提交
326 327 328 329 330 331 332
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
    tDebug("success to dispatch conn to work thread");
  } else {
    tError("fail to dispatch conn to work thread");
  }
}
dengyihao's avatar
dengyihao 已提交
333

dengyihao's avatar
dengyihao 已提交
334
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
335
  // impl later;
dengyihao's avatar
dengyihao 已提交
336 337
  tDebug("conn %p prepare to send resp", smsg->pConn);
  SRpcMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
338 339 340 341 342
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
343
  pHead->msgType = smsg->pConn->inType + 1;
dengyihao's avatar
dengyihao 已提交
344 345 346 347 348 349 350 351 352 353
  // add more info
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
  if (transCompressMsg(msg, len, NULL)) {
    // impl later
  }
  pHead->msgLen = htonl(len);
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
  uv_timer_stop(pConn->pTimer);

  pConn->pSrvMsg = smsg;
  // conn->pWriter->data = smsg;
  uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);

  // SRpcMsg* rpcMsg = smsg->msg;

  return;
}
dengyihao's avatar
dengyihao 已提交
370
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
371 372 373 374
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
dengyihao's avatar
dengyihao 已提交
375
  free(smsg);
dengyihao's avatar
dengyihao 已提交
376
}
dengyihao's avatar
dengyihao 已提交
377
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
378
  SWorkThrdObj* pThrd = handle->data;
dengyihao's avatar
dengyihao 已提交
379
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
380 381
  queue         wq;
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
dengyihao 已提交
382 383 384
  pthread_mutex_lock(&pThrd->msgMtx);
  QUEUE_MOVE(&pThrd->msg, &wq);
  pthread_mutex_unlock(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
385 386 387 388

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
389 390 391 392 393

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
      tError("except occurred, continue");
      continue;
dengyihao's avatar
dengyihao 已提交
394
    }
dengyihao's avatar
dengyihao 已提交
395 396 397 398 399 400 401
    if (msg->pConn == NULL) {
      //
      free(msg);
      uv_stop(pThrd->loop);
    } else {
      uvStartSendResp(msg);
    }
dengyihao's avatar
dengyihao 已提交
402 403 404
    // uv_buf_t wb;
    // uvPrepareSendData(msg, &wb);
    // uv_timer_stop(conn->pTimer);
dengyihao's avatar
dengyihao 已提交
405

dengyihao's avatar
dengyihao 已提交
406
    // uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
dengyihao's avatar
dengyihao 已提交
407 408
  }
}
dengyihao's avatar
dengyihao 已提交
409 410 411 412
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
  uv_stop(srv->loop);
}
dengyihao's avatar
dengyihao 已提交
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428

void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

  uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
    uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));

    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
429

dengyihao's avatar
dengyihao 已提交
430
    tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
431
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
432 433
  } else {
    uv_close((uv_handle_t*)cli, NULL);
434
    free(cli);
dengyihao's avatar
dengyihao 已提交
435 436 437 438 439 440 441 442 443
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
  tDebug("connection coming");
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
444
    // uv_close((uv_handle_t*)q, NULL);
dengyihao's avatar
dengyihao 已提交
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
  free(buf->base);

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
dengyihao 已提交
463
  SSrvConn* pConn = createConn();
464

dengyihao's avatar
dengyihao 已提交
465
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485
  /* init conn timer*/
  pConn->pTimer = malloc(sizeof(uv_timer_t));
  uv_timer_init(pThrd->loop, pConn->pTimer);
  pConn->pTimer->data = pConn;

  pConn->hostThrd = pThrd;
  pConn->pWorkerAsync = pThrd->workerAsync;  // thread safty

  // init client handle
  pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

  // init write request, just
  pConn->pWriter = calloc(1, sizeof(uv_write_t));
  pConn->pWriter->data = pConn;

  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
486
    tDebug("conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
487 488
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
  } else {
dengyihao's avatar
dengyihao 已提交
489
    tDebug("failed to create new connection");
490
    destroyConn(pConn, true);
dengyihao's avatar
dengyihao 已提交
491 492 493 494 495 496 497 498
  }
}

void* acceptThread(void* arg) {
  // opt
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
}
dengyihao's avatar
dengyihao 已提交
499 500
static bool addHandleToWorkloop(void* arg) {
  SWorkThrdObj* pThrd = arg;
dengyihao's avatar
dengyihao 已提交
501
  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
502 503 504
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
505 506

  // SRpcInfo* pRpc = pThrd->shandle;
dengyihao's avatar
dengyihao 已提交
507
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
dengyihao's avatar
dengyihao 已提交
508 509 510 511
  uv_pipe_open(pThrd->pipe, pThrd->fd);

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
512 513
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
514 515 516

  pThrd->workerAsync = malloc(sizeof(uv_async_t));
  uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb);
dengyihao's avatar
dengyihao 已提交
517
  pThrd->workerAsync->data = pThrd;
dengyihao's avatar
dengyihao 已提交
518 519

  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
520 521 522 523 524 525 526 527 528 529 530 531 532
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
533 534 535 536
  // register an async here to quit server gracefully
  srv->pAcceptAsync = calloc(1, sizeof(uv_async_t));
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
537

dengyihao's avatar
dengyihao 已提交
538
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
539 540 541 542 543 544 545 546 547 548
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
  if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) {
    tError("failed to listen: %s", uv_err_name(err));
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
549 550 551
}
void* workerThread(void* arg) {
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
552 553 554
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
}

dengyihao's avatar
dengyihao 已提交
555 556 557
static SSrvConn* createConn() {
  SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
  tDebug("conn %p created", pConn);
558
  ++pConn->ref;
dengyihao's avatar
dengyihao 已提交
559 560
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
561

dengyihao's avatar
dengyihao 已提交
562
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
563 564 565
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
566 567
  tDebug("conn %p try to destroy", conn);
  if (--conn->ref > 0) {
568 569
    return;
  }
dengyihao's avatar
dengyihao 已提交
570
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
571 572
  destroySmsg(conn->pSrvMsg);
  conn->pSrvMsg = NULL;
dengyihao's avatar
dengyihao 已提交
573

574
  if (clear) {
dengyihao's avatar
dengyihao 已提交
575
    uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
576
  }
dengyihao's avatar
dengyihao 已提交
577 578 579 580
}
static void uvDestroyConn(uv_handle_t* handle) {
  SSrvConn* conn = handle->data;
  tDebug("conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
581 582
  uv_timer_stop(conn->pTimer);
  free(conn->pTimer);
dengyihao's avatar
dengyihao 已提交
583
  // free(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
584
  free(conn->pWriter);
585
  free(conn);
dengyihao's avatar
dengyihao 已提交
586
}
dengyihao's avatar
dengyihao 已提交
587
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
dengyihao's avatar
dengyihao 已提交
588
  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
589 590 591 592

  if (pConn->spi && pConn->secured == 0) {
    // add auth part
    pHead->spi = pConn->spi;
dengyihao's avatar
dengyihao 已提交
593
    STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen);
dengyihao's avatar
dengyihao 已提交
594 595 596
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
dengyihao's avatar
dengyihao 已提交
597 598
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
dengyihao's avatar
dengyihao 已提交
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619
  } else {
    pHead->spi = 0;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
  }

  return msgLen;
}

void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
  SServerObj* srv = calloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
  srv->pThreadObj = (SWorkThrdObj**)calloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*));
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

  for (int i = 0; i < srv->numOfThreads; i++) {
    SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
620
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
621

dengyihao's avatar
dengyihao 已提交
622 623 624
    srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
    int fds[2];
    if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
dengyihao's avatar
dengyihao 已提交
625
      goto End;
dengyihao's avatar
dengyihao 已提交
626 627 628 629
    }
    uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

dengyihao's avatar
dengyihao 已提交
630
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
631 632
    thrd->fd = fds[0];
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
633

dengyihao's avatar
dengyihao 已提交
634 635 636
    if (false == addHandleToWorkloop(thrd)) {
      goto End;
    }
dengyihao's avatar
dengyihao 已提交
637 638 639 640 641 642 643 644 645
    int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
    }
  }
dengyihao's avatar
dengyihao 已提交
646 647 648
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
dengyihao's avatar
dengyihao 已提交
649 650 651 652 653 654 655 656
  int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
    // clear all resource later
  }

  return srv;
dengyihao's avatar
dengyihao 已提交
657 658 659 660 661 662 663 664 665 666 667
End:
  taosCloseServer(srv);
  return NULL;
}

void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
  pthread_join(pThrd->thread, NULL);
  free(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
668
  free(pThrd->workerAsync);
dengyihao's avatar
dengyihao 已提交
669
  free(pThrd);
dengyihao's avatar
dengyihao 已提交
670
}
dengyihao's avatar
dengyihao 已提交
671 672 673 674 675 676 677 678 679 680 681
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));

  pthread_mutex_lock(&pThrd->msgMtx);
  QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
  pthread_mutex_unlock(&pThrd->msgMtx);
  tDebug("send quit msg to work thread");

  uv_async_send(pThrd->workerAsync);
}

dengyihao's avatar
dengyihao 已提交
682 683 684
void taosCloseServer(void* arg) {
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
685
  for (int i = 0; i < srv->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
686
    sendQuitToWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
687
    destroyWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
688
  }
dengyihao's avatar
dengyihao 已提交
689 690 691 692 693 694 695

  tDebug("send quit msg to accept thread");
  uv_async_send(srv->pAcceptAsync);
  pthread_join(srv->thread, NULL);

  free(srv->pThreadObj);
  free(srv->pAcceptAsync);
dengyihao's avatar
dengyihao 已提交
696
  free(srv->loop);
dengyihao's avatar
dengyihao 已提交
697 698 699 700

  for (int i = 0; i < srv->numOfThreads; i++) {
    free(srv->pipe[i]);
  }
dengyihao's avatar
dengyihao 已提交
701
  free(srv->pipe);
dengyihao's avatar
dengyihao 已提交
702

dengyihao's avatar
dengyihao 已提交
703
  free(srv);
dengyihao's avatar
dengyihao 已提交
704
}
dengyihao's avatar
dengyihao 已提交
705 706

void rpcSendResponse(const SRpcMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
707
  SSrvConn*     pConn = pMsg->handle;
dengyihao's avatar
dengyihao 已提交
708 709
  SWorkThrdObj* pThrd = pConn->hostThrd;

dengyihao's avatar
dengyihao 已提交
710 711 712 713 714 715 716 717
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  srvMsg->pConn = pConn;
  srvMsg->msg = *pMsg;

  pthread_mutex_lock(&pThrd->msgMtx);
  QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
  pthread_mutex_unlock(&pThrd->msgMtx);

dengyihao's avatar
dengyihao 已提交
718
  tDebug("conn %p start to send resp", pConn);
dengyihao's avatar
dengyihao 已提交
719

dengyihao's avatar
dengyihao 已提交
720
  uv_async_send(pThrd->workerAsync);
dengyihao's avatar
dengyihao 已提交
721 722 723
}

#endif