transSrv.c 21.7 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
21 22 23 24
  uv_tcp_t*   pTcp;
  uv_write_t* pWriter;
  uv_timer_t* pTimer;

dengyihao's avatar
dengyihao 已提交
25
  // uv_async_t* pWorkerAsync;
dengyihao's avatar
dengyihao 已提交
26 27
  queue       queue;
  int         ref;
dengyihao's avatar
dengyihao 已提交
28
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
29
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
30 31 32
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
33
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
34
  void*       pSrvMsg;
dengyihao's avatar
dengyihao 已提交
35

dengyihao's avatar
dengyihao 已提交
36
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
37

dengyihao's avatar
dengyihao 已提交
38
  // SRpcMsg sendMsg;
dengyihao's avatar
dengyihao 已提交
39 40 41 42 43 44 45
  // del later
  char secured;
  int  spi;
  char info[64];
  char user[TSDB_UNI_LEN];  // user ID for the link
  char secret[TSDB_PASSWORD_LEN];
  char ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
46 47 48 49 50 51 52
} SSrvConn;

typedef struct SSrvMsg {
  SSrvConn* pConn;
  SRpcMsg   msg;
  queue     q;
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
53 54

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
55 56 57 58 59 60
  pthread_t   thread;
  uv_pipe_t*  pipe;
  int         fd;
  uv_loop_t*  loop;
  SAsyncPool* asyncPool;
  // uv_async_t*     workerAsync;  //
dengyihao's avatar
dengyihao 已提交
61 62
  queue           msg;
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
63
  void*           pTransInst;
dengyihao's avatar
dengyihao 已提交
64 65 66 67 68 69 70 71 72 73 74 75
} SWorkThrdObj;

typedef struct SServerObj {
  pthread_t      thread;
  uv_tcp_t       server;
  uv_loop_t*     loop;
  int            workerIdx;
  int            numOfThreads;
  SWorkThrdObj** pThreadObj;
  uv_pipe_t**    pipe;
  uint32_t       ip;
  uint32_t       port;
dengyihao's avatar
dengyihao 已提交
76
  uv_async_t*    pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
77 78 79 80 81
} SServerObj;

static const char* notify = "a";

// refactor later
dengyihao's avatar
dengyihao 已提交
82
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
83

dengyihao's avatar
dengyihao 已提交
84
static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
85 86 87 88 89 90

static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void uvOnTimeoutCb(uv_timer_t* handle);
static void uvOnWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
91
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
92 93 94
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
95
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
96

dengyihao's avatar
dengyihao 已提交
97 98
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
99

dengyihao's avatar
dengyihao 已提交
100
static void destroySmsg(SSrvMsg* smsg);
101
// check whether already read complete packet
dengyihao's avatar
dengyihao 已提交
102 103 104
static bool      readComplete(SConnBuffer* buf);
static SSrvConn* createConn();
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
105

106
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
107

dengyihao's avatar
dengyihao 已提交
108
// server and worker thread
dengyihao's avatar
dengyihao 已提交
109 110 111
static void* workerThread(void* arg);
static void* acceptThread(void* arg);

dengyihao's avatar
dengyihao 已提交
112 113 114 115
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
116 117 118
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  /*
   * formate of data buffer:
dengyihao's avatar
dengyihao 已提交
119 120
   * |<--------------------------data from socket------------------------------->|
   * |<------STransMsgHead------->|<-------------------other data--------------->|
dengyihao's avatar
dengyihao 已提交
121
   */
dengyihao's avatar
dengyihao 已提交
122 123 124
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
125 126
}

dengyihao's avatar
dengyihao 已提交
127
// check data read from socket complete or not
dengyihao's avatar
dengyihao 已提交
128 129 130
//
static bool readComplete(SConnBuffer* data) {
  // TODO(yihao): handle pipeline later
dengyihao's avatar
dengyihao 已提交
131 132
  STransMsgHead head;
  int32_t       headLen = sizeof(head);
dengyihao's avatar
dengyihao 已提交
133
  if (data->len >= headLen) {
dengyihao's avatar
dengyihao 已提交
134 135
    memcpy((char*)&head, data->buf, headLen);
    int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
dengyihao's avatar
dengyihao 已提交
136 137 138
    if (msgLen > data->len) {
      data->left = msgLen - data->len;
      return false;
dengyihao's avatar
dengyihao 已提交
139
    } else if (msgLen == data->len) {
dengyihao's avatar
dengyihao 已提交
140
      return true;
dengyihao's avatar
dengyihao 已提交
141 142 143
    } else if (msgLen < data->len) {
      return false;
      // handle other packet later
dengyihao's avatar
dengyihao 已提交
144 145 146 147 148 149
    }
  } else {
    return false;
  }
}

dengyihao's avatar
dengyihao 已提交
150 151 152 153
// static void uvDoProcess(SRecvInfo* pRecv) {
//  // impl later
//  STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
//  SRpcInfo*      pRpc = (SRpcInfo*)pRecv->shandle;
dengyihao's avatar
dengyihao 已提交
154
//  SSrvConn*         pConn = pRecv->thandle;
dengyihao's avatar
dengyihao 已提交
155 156 157 158 159 160
//  tDump(pRecv->msg, pRecv->msgLen);
//  terrno = 0;
//  // SRpcReqContext* pContest;
//
//  // do auth and check
//}
dengyihao's avatar
dengyihao 已提交
161

dengyihao's avatar
dengyihao 已提交
162
static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
dengyihao's avatar
dengyihao 已提交
163 164 165
  STransMsgHead* pHead = (STransMsgHead*)msg;

  int code = 0;
dengyihao's avatar
dengyihao 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216

  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
    // secured link, or no authentication
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
    // tTrace("%s, secured link, no auth is required", pConn->info);
    return 0;
  }

  if (!rpcIsReq(pHead->msgType)) {
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
        code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
        code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
      return 0;
    }
  }

  code = 0;
  if (pHead->spi == pConn->spi) {
    // authentication
    SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest));

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
    } else {
      if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
        // tDebug("%s, authentication failed, msg discarded", pConn->info);
        code = TSDB_CODE_RPC_AUTH_FAILURE;
      } else {
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
        if (!rpcIsReq(pHead->msgType)) pConn->secured = 1;  // link is secured for client
        // tTrace("%s, message is authenticated", pConn->info);
      }
    }
  } else {
    tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
  }

  return code;
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
217
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
218
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
219 220
}

dengyihao's avatar
dengyihao 已提交
221
static void uvHandleReq(SSrvConn* pConn) {
dengyihao's avatar
dengyihao 已提交
222 223
  SRecvInfo    info;
  SRecvInfo*   p = &info;
dengyihao's avatar
dengyihao 已提交
224
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
225
  p->msg = pBuf->buf;
dengyihao's avatar
dengyihao 已提交
226 227 228
  p->msgLen = pBuf->len;
  p->ip = 0;
  p->port = 0;
dengyihao's avatar
dengyihao 已提交
229
  p->shandle = pConn->pTransInst;  //
dengyihao's avatar
dengyihao 已提交
230 231 232
  p->thandle = pConn;
  p->chandle = NULL;

dengyihao's avatar
dengyihao 已提交
233
  STransMsgHead* pHead = (STransMsgHead*)p->msg;
dengyihao's avatar
dengyihao 已提交
234 235 236 237
  if (pHead->secured == 0) {
    STransUserMsg* uMsg = (p->msg + p->msgLen - sizeof(STransUserMsg));
    memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
  }
dengyihao's avatar
dengyihao 已提交
238 239

  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
240
  assert(transIsReq(pHead->msgType));
dengyihao's avatar
dengyihao 已提交
241 242 243 244

  SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
  pHead->code = htonl(pHead->code);

dengyihao's avatar
dengyihao 已提交
245 246 247 248 249
  int32_t dlen = 0;
  if (transDecompressMsg(NULL, 0, NULL)) {
    // add compress later
    // pHead = rpcDecompressRpcMsg(pHead);
  } else {
dengyihao's avatar
dengyihao 已提交
250
    pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
251
    // impl later
dengyihao's avatar
dengyihao 已提交
252
    //
dengyihao's avatar
dengyihao 已提交
253
  }
dengyihao's avatar
dengyihao 已提交
254 255

  SRpcMsg rpcMsg;
dengyihao's avatar
dengyihao 已提交
256
  rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
257 258 259
  rpcMsg.pCont = pHead->content;
  rpcMsg.msgType = pHead->msgType;
  rpcMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
260
  rpcMsg.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
261 262
  rpcMsg.handle = pConn;

dengyihao's avatar
dengyihao 已提交
263
  transClearBuffer(&pConn->readBuf);
264
  pConn->ref++;
dengyihao's avatar
dengyihao 已提交
265
  tDebug("server conn %p %s received from %s:%d", pConn, TMSG_INFO(rpcMsg.msgType), inet_ntoa(pConn->addr.sin_addr),
dengyihao's avatar
dengyihao 已提交
266
         ntohs(pConn->addr.sin_port));
dengyihao's avatar
dengyihao 已提交
267
  (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
dengyihao's avatar
dengyihao 已提交
268
  // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
269 270 271 272 273 274
  // auth
  // validate msg type
}

void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
  // opt
dengyihao's avatar
dengyihao 已提交
275 276
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
277 278
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
279
    tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
280
    if (readComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
281
      tTrace("server conn %p alread read complete packet", conn);
dengyihao's avatar
dengyihao 已提交
282
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
283
    } else {
dengyihao's avatar
dengyihao 已提交
284
      tTrace("server %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
285 286 287
    }
    return;
  }
288 289 290
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
291 292 293 294
  if (nread < 0 || nread != UV_EOF) {
    if (conn->ref > 1) {
      conn->ref++;  // ref > 1 signed that write is in progress
    }
dengyihao's avatar
dengyihao 已提交
295
    tError("server conn %p read error: %s", conn, uv_err_name(nread));
dengyihao's avatar
dengyihao 已提交
296
    destroyConn(conn, true);
dengyihao's avatar
dengyihao 已提交
297 298 299 300 301 302 303 304 305
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->base = malloc(sizeof(char));
  buf->len = 2;
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
306
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
307
  tError("server conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
308 309 310
}

void uvOnWriteCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
311
  SSrvConn* conn = req->data;
dengyihao's avatar
dengyihao 已提交
312 313 314 315

  SSrvMsg* smsg = conn->pSrvMsg;
  destroySmsg(smsg);
  conn->pSrvMsg = NULL;
dengyihao's avatar
dengyihao 已提交
316

dengyihao's avatar
dengyihao 已提交
317
  transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
318
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
319
    tTrace("server conn %p data already was written on stream", conn);
dengyihao's avatar
dengyihao 已提交
320
  } else {
dengyihao's avatar
dengyihao 已提交
321
    tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
322
    //
323
    destroyConn(conn, true);
dengyihao's avatar
dengyihao 已提交
324 325 326
  }
  // opt
}
dengyihao's avatar
dengyihao 已提交
327 328
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
329
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
330 331 332 333
  } else {
    tError("fail to dispatch conn to work thread");
  }
}
dengyihao's avatar
dengyihao 已提交
334

dengyihao's avatar
dengyihao 已提交
335
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
336
  // impl later;
dengyihao's avatar
dengyihao 已提交
337
  tTrace("server conn %p prepare to send resp", smsg->pConn);
dengyihao's avatar
dengyihao 已提交
338 339
  SRpcMsg*  pMsg = &smsg->msg;
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
340 341 342 343 344
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
345
  pHead->msgType = smsg->pConn->inType + 1;
dengyihao's avatar
dengyihao 已提交
346
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
347 348 349 350 351 352
  // add more info
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
  if (transCompressMsg(msg, len, NULL)) {
    // impl later
  }
dengyihao's avatar
dengyihao 已提交
353
  tDebug("server conn %p %s is sent to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr),
dengyihao's avatar
dengyihao 已提交
354 355
         ntohs(pConn->addr.sin_port));

dengyihao's avatar
dengyihao 已提交
356 357 358 359
  pHead->msgLen = htonl(len);
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
  uv_timer_stop(pConn->pTimer);

  pConn->pSrvMsg = smsg;
  // conn->pWriter->data = smsg;
  uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);

  // SRpcMsg* rpcMsg = smsg->msg;

  return;
}
dengyihao's avatar
dengyihao 已提交
376
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
377 378 379 380
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
dengyihao's avatar
dengyihao 已提交
381
  free(smsg);
dengyihao's avatar
dengyihao 已提交
382
}
dengyihao's avatar
dengyihao 已提交
383
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
384 385
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
386
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
387 388
  queue         wq;
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
dengyihao 已提交
389 390 391 392
  pthread_mutex_lock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  pthread_mutex_unlock(&item->mtx);
  // pthread_mutex_unlock(&mtx);
dengyihao's avatar
dengyihao 已提交
393 394 395 396

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
397 398 399

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
400
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
401
      continue;
dengyihao's avatar
dengyihao 已提交
402
    }
dengyihao's avatar
dengyihao 已提交
403 404 405 406 407 408 409
    if (msg->pConn == NULL) {
      //
      free(msg);
      uv_stop(pThrd->loop);
    } else {
      uvStartSendResp(msg);
    }
dengyihao's avatar
dengyihao 已提交
410 411 412
    // uv_buf_t wb;
    // uvPrepareSendData(msg, &wb);
    // uv_timer_stop(conn->pTimer);
dengyihao's avatar
dengyihao 已提交
413

dengyihao's avatar
dengyihao 已提交
414
    // uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
dengyihao's avatar
dengyihao 已提交
415 416
  }
}
dengyihao's avatar
dengyihao 已提交
417 418 419 420
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
  uv_stop(srv->loop);
}
dengyihao's avatar
dengyihao 已提交
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436

void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

  uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
    uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));

    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
437

dengyihao's avatar
dengyihao 已提交
438
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
439
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
440 441
  } else {
    uv_close((uv_handle_t*)cli, NULL);
442
    free(cli);
dengyihao's avatar
dengyihao 已提交
443 444 445
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
446
  tTrace("server connection coming");
dengyihao's avatar
dengyihao 已提交
447 448 449 450 451
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
452
    // uv_close((uv_handle_t*)q, NULL);
dengyihao's avatar
dengyihao 已提交
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
  free(buf->base);

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
dengyihao 已提交
471
  SSrvConn* pConn = createConn();
472

dengyihao's avatar
dengyihao 已提交
473
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
474 475 476 477 478 479
  /* init conn timer*/
  pConn->pTimer = malloc(sizeof(uv_timer_t));
  uv_timer_init(pThrd->loop, pConn->pTimer);
  pConn->pTimer->data = pConn;

  pConn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
480
  // pConn->pWorkerAsync = pThrd->workerAsync;  // thread safty
dengyihao's avatar
dengyihao 已提交
481 482 483 484 485 486 487 488 489 490 491 492 493

  // init client handle
  pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

  // init write request, just
  pConn->pWriter = calloc(1, sizeof(uv_write_t));
  pConn->pWriter->data = pConn;

  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
494
    tTrace("server conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
495 496
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
497
      tError("server conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
498 499 500 501
      destroyConn(pConn, true);
    } else {
      uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
    }
dengyihao's avatar
dengyihao 已提交
502
  } else {
dengyihao's avatar
dengyihao 已提交
503
    tDebug("failed to create new connection");
504
    destroyConn(pConn, true);
dengyihao's avatar
dengyihao 已提交
505 506 507 508 509
  }
}

void* acceptThread(void* arg) {
  // opt
dengyihao's avatar
dengyihao 已提交
510
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
511 512 513
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
}
dengyihao's avatar
dengyihao 已提交
514 515
static bool addHandleToWorkloop(void* arg) {
  SWorkThrdObj* pThrd = arg;
dengyihao's avatar
dengyihao 已提交
516
  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
517 518 519
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
520 521

  // SRpcInfo* pRpc = pThrd->shandle;
dengyihao's avatar
dengyihao 已提交
522
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
dengyihao's avatar
dengyihao 已提交
523 524 525 526
  uv_pipe_open(pThrd->pipe, pThrd->fd);

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
527 528
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
529

dengyihao's avatar
dengyihao 已提交
530
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, uvWorkerAsyncCb);
dengyihao's avatar
dengyihao 已提交
531
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
532 533 534 535 536 537 538 539 540 541 542 543 544
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
545 546 547 548
  // register an async here to quit server gracefully
  srv->pAcceptAsync = calloc(1, sizeof(uv_async_t));
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
549

dengyihao's avatar
dengyihao 已提交
550
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
551 552 553 554 555
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
556
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
557 558 559 560
    tError("failed to listen: %s", uv_err_name(err));
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
561 562
}
void* workerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
563
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
564
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
565 566 567
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
}

dengyihao's avatar
dengyihao 已提交
568 569
static SSrvConn* createConn() {
  SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
dengyihao's avatar
dengyihao 已提交
570
  tTrace("conn %p created", pConn);
571
  ++pConn->ref;
dengyihao's avatar
dengyihao 已提交
572 573
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
574

dengyihao's avatar
dengyihao 已提交
575
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
576 577 578
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
579
  tTrace("server conn %p try to destroy", conn);
dengyihao's avatar
dengyihao 已提交
580
  if (--conn->ref > 0) {
581 582
    return;
  }
dengyihao's avatar
dengyihao 已提交
583
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
584 585
  destroySmsg(conn->pSrvMsg);
  conn->pSrvMsg = NULL;
dengyihao's avatar
dengyihao 已提交
586

587
  if (clear) {
dengyihao's avatar
dengyihao 已提交
588
    uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
589
  }
dengyihao's avatar
dengyihao 已提交
590 591 592
}
static void uvDestroyConn(uv_handle_t* handle) {
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
593
  tDebug("server conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
594 595
  uv_timer_stop(conn->pTimer);
  free(conn->pTimer);
dengyihao's avatar
dengyihao 已提交
596
  // free(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
597
  free(conn->pWriter);
598
  free(conn);
dengyihao's avatar
dengyihao 已提交
599
}
dengyihao's avatar
dengyihao 已提交
600
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
dengyihao's avatar
dengyihao 已提交
601
  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
602 603 604 605

  if (pConn->spi && pConn->secured == 0) {
    // add auth part
    pHead->spi = pConn->spi;
dengyihao's avatar
dengyihao 已提交
606
    STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen);
dengyihao's avatar
dengyihao 已提交
607 608 609
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
dengyihao's avatar
dengyihao 已提交
610 611
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
dengyihao's avatar
dengyihao 已提交
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632
  } else {
    pHead->spi = 0;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
  }

  return msgLen;
}

void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
  SServerObj* srv = calloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
  srv->pThreadObj = (SWorkThrdObj**)calloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*));
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

  for (int i = 0; i < srv->numOfThreads; i++) {
    SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
633
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
634

dengyihao's avatar
dengyihao 已提交
635 636 637
    srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
    int fds[2];
    if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
dengyihao's avatar
dengyihao 已提交
638
      goto End;
dengyihao's avatar
dengyihao 已提交
639 640 641 642
    }
    uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

dengyihao's avatar
dengyihao 已提交
643
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
644 645
    thrd->fd = fds[0];
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
646

dengyihao's avatar
dengyihao 已提交
647 648 649
    if (false == addHandleToWorkloop(thrd)) {
      goto End;
    }
dengyihao's avatar
dengyihao 已提交
650 651 652 653 654 655 656 657 658
    int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
    }
  }
dengyihao's avatar
dengyihao 已提交
659 660 661
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
dengyihao's avatar
dengyihao 已提交
662 663 664 665 666 667 668 669
  int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
    // clear all resource later
  }

  return srv;
dengyihao's avatar
dengyihao 已提交
670 671 672 673 674 675 676 677 678 679 680
End:
  taosCloseServer(srv);
  return NULL;
}

void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
  pthread_join(pThrd->thread, NULL);
  free(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
681 682 683
  transDestroyAsyncPool(pThrd->asyncPool);

  // free(pThrd->workerAsync);
dengyihao's avatar
dengyihao 已提交
684
  free(pThrd);
dengyihao's avatar
dengyihao 已提交
685
}
dengyihao's avatar
dengyihao 已提交
686 687 688
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));

dengyihao's avatar
dengyihao 已提交
689 690 691
  // pthread_mutex_lock(&pThrd->msgMtx);
  // QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
  // pthread_mutex_unlock(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
692 693
  tDebug("send quit msg to work thread");

dengyihao's avatar
dengyihao 已提交
694
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
695
  // uv_async_send(pThrd->workerAsync);
dengyihao's avatar
dengyihao 已提交
696 697
}

dengyihao's avatar
dengyihao 已提交
698 699 700
void taosCloseServer(void* arg) {
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
701
  for (int i = 0; i < srv->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
702
    sendQuitToWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
703
    destroyWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
704
  }
dengyihao's avatar
dengyihao 已提交
705 706 707 708 709 710 711

  tDebug("send quit msg to accept thread");
  uv_async_send(srv->pAcceptAsync);
  pthread_join(srv->thread, NULL);

  free(srv->pThreadObj);
  free(srv->pAcceptAsync);
dengyihao's avatar
dengyihao 已提交
712
  free(srv->loop);
dengyihao's avatar
dengyihao 已提交
713 714 715 716

  for (int i = 0; i < srv->numOfThreads; i++) {
    free(srv->pipe[i]);
  }
dengyihao's avatar
dengyihao 已提交
717
  free(srv->pipe);
dengyihao's avatar
dengyihao 已提交
718

dengyihao's avatar
dengyihao 已提交
719
  free(srv);
dengyihao's avatar
dengyihao 已提交
720
}
dengyihao's avatar
dengyihao 已提交
721 722

void rpcSendResponse(const SRpcMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
723 724 725
  if (pMsg->handle == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
726
  SSrvConn*     pConn = pMsg->handle;
dengyihao's avatar
dengyihao 已提交
727 728
  SWorkThrdObj* pThrd = pConn->hostThrd;

dengyihao's avatar
dengyihao 已提交
729 730 731 732
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  srvMsg->pConn = pConn;
  srvMsg->msg = *pMsg;

dengyihao's avatar
dengyihao 已提交
733 734 735
  // pthread_mutex_lock(&pThrd->msgMtx);
  // QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
  // pthread_mutex_unlock(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
736

dengyihao's avatar
dengyihao 已提交
737
  tTrace("server conn %p start to send resp", pConn);
dengyihao's avatar
dengyihao 已提交
738
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
739
  // uv_async_send(pThrd->workerAsync);
dengyihao's avatar
dengyihao 已提交
740 741
}

dengyihao's avatar
dengyihao 已提交
742
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) {
dengyihao's avatar
dengyihao 已提交
743 744
  SSrvConn* pConn = thandle;
  // struct sockaddr* pPeerName = &pConn->peername;
dengyihao's avatar
dengyihao 已提交
745

dengyihao's avatar
dengyihao 已提交
746 747 748
  struct sockaddr_in addr = pConn->addr;
  pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(addr.sin_port);
dengyihao's avatar
dengyihao 已提交
749 750 751 752 753

  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
754
#endif