transSrv.c 21.3 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
21 22 23 24
  uv_tcp_t*   pTcp;
  uv_write_t* pWriter;
  uv_timer_t* pTimer;

dengyihao's avatar
dengyihao 已提交
25
  // uv_async_t* pWorkerAsync;
dengyihao's avatar
dengyihao 已提交
26 27
  queue       queue;
  int         ref;
dengyihao's avatar
dengyihao 已提交
28
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
29
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
30 31 32
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
33
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
34
  void*       pSrvMsg;
dengyihao's avatar
dengyihao 已提交
35

dengyihao's avatar
dengyihao 已提交
36 37
  struct sockaddr peername;

dengyihao's avatar
dengyihao 已提交
38
  // SRpcMsg sendMsg;
dengyihao's avatar
dengyihao 已提交
39 40 41 42 43 44 45
  // del later
  char secured;
  int  spi;
  char info[64];
  char user[TSDB_UNI_LEN];  // user ID for the link
  char secret[TSDB_PASSWORD_LEN];
  char ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
46 47 48 49 50 51 52
} SSrvConn;

typedef struct SSrvMsg {
  SSrvConn* pConn;
  SRpcMsg   msg;
  queue     q;
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
53 54

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
55 56 57 58 59 60
  pthread_t   thread;
  uv_pipe_t*  pipe;
  int         fd;
  uv_loop_t*  loop;
  SAsyncPool* asyncPool;
  // uv_async_t*     workerAsync;  //
dengyihao's avatar
dengyihao 已提交
61 62
  queue           msg;
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
63
  void*           pTransInst;
dengyihao's avatar
dengyihao 已提交
64 65 66 67 68 69 70 71 72 73 74 75
} SWorkThrdObj;

typedef struct SServerObj {
  pthread_t      thread;
  uv_tcp_t       server;
  uv_loop_t*     loop;
  int            workerIdx;
  int            numOfThreads;
  SWorkThrdObj** pThreadObj;
  uv_pipe_t**    pipe;
  uint32_t       ip;
  uint32_t       port;
dengyihao's avatar
dengyihao 已提交
76
  uv_async_t*    pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
77 78 79 80 81
} SServerObj;

static const char* notify = "a";

// refactor later
dengyihao's avatar
dengyihao 已提交
82
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
83

dengyihao's avatar
dengyihao 已提交
84
static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
85 86 87 88 89 90

static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void uvOnTimeoutCb(uv_timer_t* handle);
static void uvOnWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
91
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
92 93 94
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
95
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
96

dengyihao's avatar
dengyihao 已提交
97 98
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
99

dengyihao's avatar
dengyihao 已提交
100
static void destroySmsg(SSrvMsg* smsg);
101
// check whether already read complete packet
dengyihao's avatar
dengyihao 已提交
102 103 104
static bool      readComplete(SConnBuffer* buf);
static SSrvConn* createConn();
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
105

106
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
107

dengyihao's avatar
dengyihao 已提交
108
// server and worker thread
dengyihao's avatar
dengyihao 已提交
109 110 111
static void* workerThread(void* arg);
static void* acceptThread(void* arg);

dengyihao's avatar
dengyihao 已提交
112 113 114 115
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
116 117 118
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  /*
   * formate of data buffer:
dengyihao's avatar
dengyihao 已提交
119 120
   * |<--------------------------data from socket------------------------------->|
   * |<------STransMsgHead------->|<-------------------other data--------------->|
dengyihao's avatar
dengyihao 已提交
121
   */
dengyihao's avatar
dengyihao 已提交
122 123 124
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
125 126 127 128 129 130
}

// check data read from socket completely or not
//
static bool readComplete(SConnBuffer* data) {
  // TODO(yihao): handle pipeline later
dengyihao's avatar
dengyihao 已提交
131 132
  STransMsgHead head;
  int32_t       headLen = sizeof(head);
dengyihao's avatar
dengyihao 已提交
133
  if (data->len >= headLen) {
dengyihao's avatar
dengyihao 已提交
134 135
    memcpy((char*)&head, data->buf, headLen);
    int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
dengyihao's avatar
dengyihao 已提交
136 137 138
    if (msgLen > data->len) {
      data->left = msgLen - data->len;
      return false;
dengyihao's avatar
dengyihao 已提交
139
    } else if (msgLen == data->len) {
dengyihao's avatar
dengyihao 已提交
140
      return true;
dengyihao's avatar
dengyihao 已提交
141 142 143
    } else if (msgLen < data->len) {
      return false;
      // handle other packet later
dengyihao's avatar
dengyihao 已提交
144 145 146 147 148 149
    }
  } else {
    return false;
  }
}

dengyihao's avatar
dengyihao 已提交
150 151 152 153
// static void uvDoProcess(SRecvInfo* pRecv) {
//  // impl later
//  STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
//  SRpcInfo*      pRpc = (SRpcInfo*)pRecv->shandle;
dengyihao's avatar
dengyihao 已提交
154
//  SSrvConn*         pConn = pRecv->thandle;
dengyihao's avatar
dengyihao 已提交
155 156 157 158 159 160
//  tDump(pRecv->msg, pRecv->msgLen);
//  terrno = 0;
//  // SRpcReqContext* pContest;
//
//  // do auth and check
//}
dengyihao's avatar
dengyihao 已提交
161

dengyihao's avatar
dengyihao 已提交
162
static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
dengyihao's avatar
dengyihao 已提交
163 164 165
  STransMsgHead* pHead = (STransMsgHead*)msg;

  int code = 0;
dengyihao's avatar
dengyihao 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216

  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
    // secured link, or no authentication
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
    // tTrace("%s, secured link, no auth is required", pConn->info);
    return 0;
  }

  if (!rpcIsReq(pHead->msgType)) {
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
        code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
        code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
      return 0;
    }
  }

  code = 0;
  if (pHead->spi == pConn->spi) {
    // authentication
    SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest));

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
    } else {
      if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
        // tDebug("%s, authentication failed, msg discarded", pConn->info);
        code = TSDB_CODE_RPC_AUTH_FAILURE;
      } else {
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
        if (!rpcIsReq(pHead->msgType)) pConn->secured = 1;  // link is secured for client
        // tTrace("%s, message is authenticated", pConn->info);
      }
    }
  } else {
    tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
  }

  return code;
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
217
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
218
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
219 220
}

dengyihao's avatar
dengyihao 已提交
221
static void uvHandleReq(SSrvConn* pConn) {
dengyihao's avatar
dengyihao 已提交
222 223
  SRecvInfo    info;
  SRecvInfo*   p = &info;
dengyihao's avatar
dengyihao 已提交
224
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
225
  p->msg = pBuf->buf;
dengyihao's avatar
dengyihao 已提交
226 227 228
  p->msgLen = pBuf->len;
  p->ip = 0;
  p->port = 0;
dengyihao's avatar
dengyihao 已提交
229
  p->shandle = pConn->pTransInst;  //
dengyihao's avatar
dengyihao 已提交
230 231 232
  p->thandle = pConn;
  p->chandle = NULL;

dengyihao's avatar
dengyihao 已提交
233
  STransMsgHead* pHead = (STransMsgHead*)p->msg;
dengyihao's avatar
dengyihao 已提交
234 235

  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
236
  assert(transIsReq(pHead->msgType));
dengyihao's avatar
dengyihao 已提交
237 238 239

  SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
  // auth here
dengyihao's avatar
dengyihao 已提交
240
  // auth should not do in rpc thread
dengyihao's avatar
dengyihao 已提交
241

dengyihao's avatar
dengyihao 已提交
242 243 244 245 246
  // int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
  // if (code != 0) {
  //  terrno = code;
  //  return;
  //}
dengyihao's avatar
dengyihao 已提交
247 248
  pHead->code = htonl(pHead->code);

dengyihao's avatar
dengyihao 已提交
249 250 251 252 253
  int32_t dlen = 0;
  if (transDecompressMsg(NULL, 0, NULL)) {
    // add compress later
    // pHead = rpcDecompressRpcMsg(pHead);
  } else {
dengyihao's avatar
dengyihao 已提交
254
    pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
255
    // impl later
dengyihao's avatar
dengyihao 已提交
256
    //
dengyihao's avatar
dengyihao 已提交
257
  }
dengyihao's avatar
dengyihao 已提交
258 259

  SRpcMsg rpcMsg;
dengyihao's avatar
dengyihao 已提交
260
  rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
261 262 263
  rpcMsg.pCont = pHead->content;
  rpcMsg.msgType = pHead->msgType;
  rpcMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
264
  rpcMsg.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
265 266
  rpcMsg.handle = pConn;

dengyihao's avatar
dengyihao 已提交
267
  transClearBuffer(&pConn->readBuf);
268
  pConn->ref++;
dengyihao's avatar
dengyihao 已提交
269
  tDebug("%s received on %p", TMSG_INFO(rpcMsg.msgType), pConn);
dengyihao's avatar
dengyihao 已提交
270
  (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
dengyihao's avatar
dengyihao 已提交
271
  // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
272 273 274 275 276 277
  // auth
  // validate msg type
}

void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
  // opt
dengyihao's avatar
dengyihao 已提交
278 279
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
280 281
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
282
    tDebug("conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
283
    if (readComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
284 285
      tDebug("conn %p alread read complete packet", conn);
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
286
    } else {
dengyihao's avatar
dengyihao 已提交
287
      tDebug("conn %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
288 289 290
    }
    return;
  }
291 292 293
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
294 295 296 297
  if (nread < 0 || nread != UV_EOF) {
    if (conn->ref > 1) {
      conn->ref++;  // ref > 1 signed that write is in progress
    }
dengyihao's avatar
dengyihao 已提交
298 299
    tDebug("conn %p read error: %s", conn, uv_err_name(nread));
    destroyConn(conn, true);
dengyihao's avatar
dengyihao 已提交
300 301 302 303 304 305 306 307 308
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->base = malloc(sizeof(char));
  buf->len = 2;
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
309
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
310
  tDebug("conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
311 312 313
}

void uvOnWriteCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
314
  SSrvConn* conn = req->data;
dengyihao's avatar
dengyihao 已提交
315 316 317 318

  SSrvMsg* smsg = conn->pSrvMsg;
  destroySmsg(smsg);
  conn->pSrvMsg = NULL;
dengyihao's avatar
dengyihao 已提交
319

dengyihao's avatar
dengyihao 已提交
320
  transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
321
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
322
    tDebug("conn %p data already was written on stream", conn);
dengyihao's avatar
dengyihao 已提交
323
  } else {
dengyihao's avatar
dengyihao 已提交
324
    tDebug("conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
325
    //
326
    destroyConn(conn, true);
dengyihao's avatar
dengyihao 已提交
327 328 329
  }
  // opt
}
dengyihao's avatar
dengyihao 已提交
330 331 332 333 334 335 336
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
    tDebug("success to dispatch conn to work thread");
  } else {
    tError("fail to dispatch conn to work thread");
  }
}
dengyihao's avatar
dengyihao 已提交
337

dengyihao's avatar
dengyihao 已提交
338
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
339
  // impl later;
dengyihao's avatar
dengyihao 已提交
340 341
  tDebug("conn %p prepare to send resp", smsg->pConn);
  SRpcMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
342 343 344 345 346
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
347
  pHead->msgType = smsg->pConn->inType + 1;
dengyihao's avatar
dengyihao 已提交
348 349 350 351 352 353 354 355 356 357
  // add more info
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
  if (transCompressMsg(msg, len, NULL)) {
    // impl later
  }
  pHead->msgLen = htonl(len);
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
  uv_timer_stop(pConn->pTimer);

  pConn->pSrvMsg = smsg;
  // conn->pWriter->data = smsg;
  uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);

  // SRpcMsg* rpcMsg = smsg->msg;

  return;
}
dengyihao's avatar
dengyihao 已提交
374
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
375 376 377 378
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
dengyihao's avatar
dengyihao 已提交
379
  free(smsg);
dengyihao's avatar
dengyihao 已提交
380
}
dengyihao's avatar
dengyihao 已提交
381
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
382 383
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
384
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
385 386
  queue         wq;
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
dengyihao 已提交
387 388 389 390
  pthread_mutex_lock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  pthread_mutex_unlock(&item->mtx);
  // pthread_mutex_unlock(&mtx);
dengyihao's avatar
dengyihao 已提交
391 392 393 394

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
395 396 397 398 399

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
      tError("except occurred, continue");
      continue;
dengyihao's avatar
dengyihao 已提交
400
    }
dengyihao's avatar
dengyihao 已提交
401 402 403 404 405 406 407
    if (msg->pConn == NULL) {
      //
      free(msg);
      uv_stop(pThrd->loop);
    } else {
      uvStartSendResp(msg);
    }
dengyihao's avatar
dengyihao 已提交
408 409 410
    // uv_buf_t wb;
    // uvPrepareSendData(msg, &wb);
    // uv_timer_stop(conn->pTimer);
dengyihao's avatar
dengyihao 已提交
411

dengyihao's avatar
dengyihao 已提交
412
    // uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
dengyihao's avatar
dengyihao 已提交
413 414
  }
}
dengyihao's avatar
dengyihao 已提交
415 416 417 418
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
  uv_stop(srv->loop);
}
dengyihao's avatar
dengyihao 已提交
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434

void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

  uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
    uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));

    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
435

dengyihao's avatar
dengyihao 已提交
436
    tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
437
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
438 439
  } else {
    uv_close((uv_handle_t*)cli, NULL);
440
    free(cli);
dengyihao's avatar
dengyihao 已提交
441 442 443 444 445 446 447 448 449
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
  tDebug("connection coming");
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
450
    // uv_close((uv_handle_t*)q, NULL);
dengyihao's avatar
dengyihao 已提交
451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
  free(buf->base);

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
dengyihao 已提交
469
  SSrvConn* pConn = createConn();
470

dengyihao's avatar
dengyihao 已提交
471
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
472 473 474 475 476 477
  /* init conn timer*/
  pConn->pTimer = malloc(sizeof(uv_timer_t));
  uv_timer_init(pThrd->loop, pConn->pTimer);
  pConn->pTimer->data = pConn;

  pConn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
478
  // pConn->pWorkerAsync = pThrd->workerAsync;  // thread safty
dengyihao's avatar
dengyihao 已提交
479 480 481 482 483 484 485 486 487 488 489 490 491

  // init client handle
  pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

  // init write request, just
  pConn->pWriter = calloc(1, sizeof(uv_write_t));
  pConn->pWriter->data = pConn;

  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
492
    tDebug("conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
493 494 495 496 497 498 499
    int namelen = sizeof(pConn->peername);
    if (0 != uv_tcp_getpeername(pConn->pTcp, &pConn->peername, &namelen)) {
      tError("failed to get peer name");
      destroyConn(pConn, true);
    } else {
      uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
    }
dengyihao's avatar
dengyihao 已提交
500
  } else {
dengyihao's avatar
dengyihao 已提交
501
    tDebug("failed to create new connection");
502
    destroyConn(pConn, true);
dengyihao's avatar
dengyihao 已提交
503 504 505 506 507
  }
}

void* acceptThread(void* arg) {
  // opt
dengyihao's avatar
dengyihao 已提交
508
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
509 510 511
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
}
dengyihao's avatar
dengyihao 已提交
512 513
static bool addHandleToWorkloop(void* arg) {
  SWorkThrdObj* pThrd = arg;
dengyihao's avatar
dengyihao 已提交
514
  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
515 516 517
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
518 519

  // SRpcInfo* pRpc = pThrd->shandle;
dengyihao's avatar
dengyihao 已提交
520
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
dengyihao's avatar
dengyihao 已提交
521 522 523 524
  uv_pipe_open(pThrd->pipe, pThrd->fd);

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
525 526
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
527

dengyihao's avatar
dengyihao 已提交
528
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, uvWorkerAsyncCb);
dengyihao's avatar
dengyihao 已提交
529
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
530 531 532 533 534 535 536 537 538 539 540 541 542
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
543 544 545 546
  // register an async here to quit server gracefully
  srv->pAcceptAsync = calloc(1, sizeof(uv_async_t));
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
547

dengyihao's avatar
dengyihao 已提交
548
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
549 550 551 552 553
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
554
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
555 556 557 558
    tError("failed to listen: %s", uv_err_name(err));
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
559 560
}
void* workerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
561
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
562
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
563 564 565
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
}

dengyihao's avatar
dengyihao 已提交
566 567 568
static SSrvConn* createConn() {
  SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
  tDebug("conn %p created", pConn);
569
  ++pConn->ref;
dengyihao's avatar
dengyihao 已提交
570 571
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
572

dengyihao's avatar
dengyihao 已提交
573
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
574 575 576
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
577 578
  tDebug("conn %p try to destroy", conn);
  if (--conn->ref > 0) {
579 580
    return;
  }
dengyihao's avatar
dengyihao 已提交
581
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
582 583
  destroySmsg(conn->pSrvMsg);
  conn->pSrvMsg = NULL;
dengyihao's avatar
dengyihao 已提交
584

585
  if (clear) {
dengyihao's avatar
dengyihao 已提交
586
    uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
587
  }
dengyihao's avatar
dengyihao 已提交
588 589 590 591
}
static void uvDestroyConn(uv_handle_t* handle) {
  SSrvConn* conn = handle->data;
  tDebug("conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
592 593
  uv_timer_stop(conn->pTimer);
  free(conn->pTimer);
dengyihao's avatar
dengyihao 已提交
594
  // free(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
595
  free(conn->pWriter);
596
  free(conn);
dengyihao's avatar
dengyihao 已提交
597
}
dengyihao's avatar
dengyihao 已提交
598
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
dengyihao's avatar
dengyihao 已提交
599
  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
600 601 602 603

  if (pConn->spi && pConn->secured == 0) {
    // add auth part
    pHead->spi = pConn->spi;
dengyihao's avatar
dengyihao 已提交
604
    STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen);
dengyihao's avatar
dengyihao 已提交
605 606 607
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
dengyihao's avatar
dengyihao 已提交
608 609
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
dengyihao's avatar
dengyihao 已提交
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
  } else {
    pHead->spi = 0;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
  }

  return msgLen;
}

void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
  SServerObj* srv = calloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
  srv->pThreadObj = (SWorkThrdObj**)calloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*));
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

  for (int i = 0; i < srv->numOfThreads; i++) {
    SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
631
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
632

dengyihao's avatar
dengyihao 已提交
633 634 635
    srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
    int fds[2];
    if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
dengyihao's avatar
dengyihao 已提交
636
      goto End;
dengyihao's avatar
dengyihao 已提交
637 638 639 640
    }
    uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

dengyihao's avatar
dengyihao 已提交
641
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
642 643
    thrd->fd = fds[0];
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
644

dengyihao's avatar
dengyihao 已提交
645 646 647
    if (false == addHandleToWorkloop(thrd)) {
      goto End;
    }
dengyihao's avatar
dengyihao 已提交
648 649 650 651 652 653 654 655 656
    int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
    }
  }
dengyihao's avatar
dengyihao 已提交
657 658 659
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
dengyihao's avatar
dengyihao 已提交
660 661 662 663 664 665 666 667
  int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
    // clear all resource later
  }

  return srv;
dengyihao's avatar
dengyihao 已提交
668 669 670 671 672 673 674 675 676 677 678
End:
  taosCloseServer(srv);
  return NULL;
}

void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
  pthread_join(pThrd->thread, NULL);
  free(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
679 680 681
  transDestroyAsyncPool(pThrd->asyncPool);

  // free(pThrd->workerAsync);
dengyihao's avatar
dengyihao 已提交
682
  free(pThrd);
dengyihao's avatar
dengyihao 已提交
683
}
dengyihao's avatar
dengyihao 已提交
684 685 686
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));

dengyihao's avatar
dengyihao 已提交
687 688 689
  // pthread_mutex_lock(&pThrd->msgMtx);
  // QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
  // pthread_mutex_unlock(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
690 691
  tDebug("send quit msg to work thread");

dengyihao's avatar
dengyihao 已提交
692
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
693
  // uv_async_send(pThrd->workerAsync);
dengyihao's avatar
dengyihao 已提交
694 695
}

dengyihao's avatar
dengyihao 已提交
696 697 698
void taosCloseServer(void* arg) {
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
699
  for (int i = 0; i < srv->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
700
    sendQuitToWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
701
    destroyWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
702
  }
dengyihao's avatar
dengyihao 已提交
703 704 705 706 707 708 709

  tDebug("send quit msg to accept thread");
  uv_async_send(srv->pAcceptAsync);
  pthread_join(srv->thread, NULL);

  free(srv->pThreadObj);
  free(srv->pAcceptAsync);
dengyihao's avatar
dengyihao 已提交
710
  free(srv->loop);
dengyihao's avatar
dengyihao 已提交
711 712 713 714

  for (int i = 0; i < srv->numOfThreads; i++) {
    free(srv->pipe[i]);
  }
dengyihao's avatar
dengyihao 已提交
715
  free(srv->pipe);
dengyihao's avatar
dengyihao 已提交
716

dengyihao's avatar
dengyihao 已提交
717
  free(srv);
dengyihao's avatar
dengyihao 已提交
718
}
dengyihao's avatar
dengyihao 已提交
719 720

void rpcSendResponse(const SRpcMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
721 722 723
  if (pMsg->handle == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
724
  SSrvConn*     pConn = pMsg->handle;
dengyihao's avatar
dengyihao 已提交
725 726
  SWorkThrdObj* pThrd = pConn->hostThrd;

dengyihao's avatar
dengyihao 已提交
727 728 729 730
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  srvMsg->pConn = pConn;
  srvMsg->msg = *pMsg;

dengyihao's avatar
dengyihao 已提交
731 732 733
  // pthread_mutex_lock(&pThrd->msgMtx);
  // QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
  // pthread_mutex_unlock(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
734

dengyihao's avatar
dengyihao 已提交
735
  tDebug("conn %p start to send resp", pConn);
dengyihao's avatar
dengyihao 已提交
736
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
737
  // uv_async_send(pThrd->workerAsync);
dengyihao's avatar
dengyihao 已提交
738 739
}

dengyihao's avatar
dengyihao 已提交
740 741 742 743 744 745 746 747 748 749 750 751
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) {
  SSrvConn*        pConn = thandle;
  struct sockaddr* pPeerName = &pConn->peername;

  struct sockaddr_in caddr = *(struct sockaddr_in*)(pPeerName);
  pInfo->clientIp = (uint32_t)(caddr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(caddr.sin_port);

  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
752
#endif