transSrv.c 23.0 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
21
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
22 23 24 25
  uv_tcp_t*   pTcp;
  uv_write_t* pWriter;
  uv_timer_t* pTimer;

dengyihao's avatar
dengyihao 已提交
26
  // uv_async_t* pWorkerAsync;
dengyihao's avatar
dengyihao 已提交
27 28
  queue       queue;
  int         ref;
dengyihao's avatar
dengyihao 已提交
29
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
30
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
31 32 33
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
34
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
35 36
  SArray*     srvMsgs;
  // void*       pSrvMsg;
dengyihao's avatar
dengyihao 已提交
37

dengyihao's avatar
dengyihao 已提交
38
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
39
  struct sockaddr_in locaddr;
dengyihao's avatar
dengyihao 已提交
40

dengyihao's avatar
dengyihao 已提交
41
  // SRpcMsg sendMsg;
dengyihao's avatar
dengyihao 已提交
42 43 44 45 46 47 48
  // del later
  char secured;
  int  spi;
  char info[64];
  char user[TSDB_UNI_LEN];  // user ID for the link
  char secret[TSDB_PASSWORD_LEN];
  char ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
49 50 51 52 53 54 55
} SSrvConn;

typedef struct SSrvMsg {
  SSrvConn* pConn;
  SRpcMsg   msg;
  queue     q;
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
56 57

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
58 59
  pthread_t   thread;
  uv_pipe_t*  pipe;
60
  uv_os_fd_t  fd;
dengyihao's avatar
dengyihao 已提交
61 62 63
  uv_loop_t*  loop;
  SAsyncPool* asyncPool;
  // uv_async_t*     workerAsync;  //
dengyihao's avatar
dengyihao 已提交
64
  queue           msg;
dengyihao's avatar
fix bug  
dengyihao 已提交
65
  queue           conn;
dengyihao's avatar
dengyihao 已提交
66
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
67
  void*           pTransInst;
dengyihao's avatar
dengyihao 已提交
68 69 70
} SWorkThrdObj;

typedef struct SServerObj {
dengyihao's avatar
dengyihao 已提交
71 72 73 74 75
  pthread_t  thread;
  uv_tcp_t   server;
  uv_loop_t* loop;

  // work thread info
dengyihao's avatar
dengyihao 已提交
76 77 78
  int            workerIdx;
  int            numOfThreads;
  SWorkThrdObj** pThreadObj;
dengyihao's avatar
dengyihao 已提交
79 80 81 82 83

  uv_pipe_t** pipe;
  uint32_t    ip;
  uint32_t    port;
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
84 85 86 87 88
} SServerObj;

static const char* notify = "a";

// refactor later
dengyihao's avatar
dengyihao 已提交
89
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
90

dengyihao's avatar
dengyihao 已提交
91
static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
92 93 94 95 96 97

static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void uvOnTimeoutCb(uv_timer_t* handle);
static void uvOnWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
98
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
99 100 101
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
102
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
103
static void uvShutDownCb(uv_shutdown_t* req, int status);
dengyihao's avatar
dengyihao 已提交
104

dengyihao's avatar
dengyihao 已提交
105
static void uvStartSendRespInternal(SSrvMsg* smsg);
dengyihao's avatar
dengyihao 已提交
106 107
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
108

dengyihao's avatar
dengyihao 已提交
109
static void destroySmsg(SSrvMsg* smsg);
110
// check whether already read complete packet
dengyihao's avatar
fix bug  
dengyihao 已提交
111
static SSrvConn* createConn(void* hThrd);
dengyihao's avatar
dengyihao 已提交
112
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
113

114
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
115

dengyihao's avatar
dengyihao 已提交
116
// server and worker thread
dengyihao's avatar
dengyihao 已提交
117 118 119
static void* workerThread(void* arg);
static void* acceptThread(void* arg);

dengyihao's avatar
dengyihao 已提交
120 121 122 123
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
124
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
125 126 127
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
128 129
}

dengyihao's avatar
dengyihao 已提交
130
static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
dengyihao's avatar
dengyihao 已提交
131 132 133
  STransMsgHead* pHead = (STransMsgHead*)msg;

  int code = 0;
dengyihao's avatar
dengyihao 已提交
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184

  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
    // secured link, or no authentication
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
    // tTrace("%s, secured link, no auth is required", pConn->info);
    return 0;
  }

  if (!rpcIsReq(pHead->msgType)) {
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
        code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
        code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
      return 0;
    }
  }

  code = 0;
  if (pHead->spi == pConn->spi) {
    // authentication
    SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest));

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
    } else {
      if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
        // tDebug("%s, authentication failed, msg discarded", pConn->info);
        code = TSDB_CODE_RPC_AUTH_FAILURE;
      } else {
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
        if (!rpcIsReq(pHead->msgType)) pConn->secured = 1;  // link is secured for client
        // tTrace("%s, message is authenticated", pConn->info);
      }
    }
  } else {
    tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
  }

  return code;
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
185
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
186
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
187 188
}

dengyihao's avatar
dengyihao 已提交
189
static void uvHandleReq(SSrvConn* pConn) {
dengyihao's avatar
dengyihao 已提交
190 191
  SRecvInfo    info;
  SRecvInfo*   p = &info;
dengyihao's avatar
dengyihao 已提交
192
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
193
  p->msg = pBuf->buf;
dengyihao's avatar
dengyihao 已提交
194 195 196
  p->msgLen = pBuf->len;
  p->ip = 0;
  p->port = 0;
dengyihao's avatar
dengyihao 已提交
197
  p->shandle = pConn->pTransInst;  //
dengyihao's avatar
dengyihao 已提交
198 199 200
  p->thandle = pConn;
  p->chandle = NULL;

dengyihao's avatar
dengyihao 已提交
201
  STransMsgHead* pHead = (STransMsgHead*)p->msg;
dengyihao's avatar
dengyihao 已提交
202
  if (pHead->secured == 1) {
dengyihao's avatar
dengyihao 已提交
203 204
    STransUserMsg* uMsg = (p->msg + p->msgLen - sizeof(STransUserMsg));
    memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
dengyihao's avatar
dengyihao 已提交
205
    memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
dengyihao's avatar
dengyihao 已提交
206
  }
dengyihao's avatar
dengyihao 已提交
207 208

  pConn->inType = pHead->msgType;
dengyihao's avatar
fix bug  
dengyihao 已提交
209
  // assert(transIsReq(pHead->msgType));
dengyihao's avatar
dengyihao 已提交
210 211 212 213

  SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
  pHead->code = htonl(pHead->code);

dengyihao's avatar
dengyihao 已提交
214 215 216 217 218
  int32_t dlen = 0;
  if (transDecompressMsg(NULL, 0, NULL)) {
    // add compress later
    // pHead = rpcDecompressRpcMsg(pHead);
  } else {
dengyihao's avatar
dengyihao 已提交
219
    pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
220
    // impl later
dengyihao's avatar
dengyihao 已提交
221
    //
dengyihao's avatar
dengyihao 已提交
222
  }
dengyihao's avatar
dengyihao 已提交
223 224

  SRpcMsg rpcMsg;
dengyihao's avatar
dengyihao 已提交
225
  rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
226 227 228
  rpcMsg.pCont = pHead->content;
  rpcMsg.msgType = pHead->msgType;
  rpcMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
229
  rpcMsg.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
230 231
  rpcMsg.handle = pConn;

dengyihao's avatar
dengyihao 已提交
232
  transClearBuffer(&pConn->readBuf);
233
  pConn->ref++;
dengyihao's avatar
fix bug  
dengyihao 已提交
234
  tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(rpcMsg.msgType),
dengyihao's avatar
dengyihao 已提交
235
         inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
dengyihao's avatar
fix bug  
dengyihao 已提交
236
         ntohs(pConn->locaddr.sin_port), rpcMsg.contLen);
dengyihao's avatar
dengyihao 已提交
237
  (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
dengyihao's avatar
dengyihao 已提交
238
  // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
239 240 241 242 243 244
  // auth
  // validate msg type
}

void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
  // opt
dengyihao's avatar
dengyihao 已提交
245 246
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
247 248
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
249
    tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
250
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
251
      tTrace("server conn %p alread read complete packet", conn);
dengyihao's avatar
dengyihao 已提交
252
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
253
    } else {
dengyihao's avatar
dengyihao 已提交
254
      tTrace("server %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
255 256 257
    }
    return;
  }
dengyihao's avatar
fix bug  
dengyihao 已提交
258 259 260 261 262 263 264 265
  if (nread == UV_EOF) {
    tError("server conn %p read error: %s", conn, uv_err_name(nread));
    if (conn->ref > 1) {
      conn->ref++;  // ref > 1 signed that write is in progress
    }
    destroyConn(conn, true);
    return;
  }
266 267 268
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
269 270 271 272
  if (nread < 0 || nread != UV_EOF) {
    if (conn->ref > 1) {
      conn->ref++;  // ref > 1 signed that write is in progress
    }
dengyihao's avatar
dengyihao 已提交
273
    tError("server conn %p read error: %s", conn, uv_err_name(nread));
dengyihao's avatar
dengyihao 已提交
274
    destroyConn(conn, true);
dengyihao's avatar
dengyihao 已提交
275 276 277 278
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->len = 2;
dengyihao's avatar
fix bug  
dengyihao 已提交
279
  buf->base = calloc(1, sizeof(char) * buf->len);
dengyihao's avatar
dengyihao 已提交
280 281 282 283
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
284
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
285
  tError("server conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
286 287 288
}

void uvOnWriteCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
289 290
  SSrvConn* conn = req->data;
  transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
291
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
292
    tTrace("server conn %p data already was written on stream", conn);
dengyihao's avatar
add UT  
dengyihao 已提交
293 294 295 296 297 298 299 300 301 302 303
    if (conn->srvMsgs != NULL) {
      assert(taosArrayGetSize(conn->srvMsgs) >= 1);
      SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
      taosArrayRemove(conn->srvMsgs, 0);
      destroySmsg(msg);

      // send second data, just use for push
      if (taosArrayGetSize(conn->srvMsgs) > 0) {
        msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
        uvStartSendRespInternal(msg);
      }
dengyihao's avatar
dengyihao 已提交
304
    }
dengyihao's avatar
dengyihao 已提交
305
  } else {
dengyihao's avatar
dengyihao 已提交
306
    tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
307
    //
308
    destroyConn(conn, true);
dengyihao's avatar
dengyihao 已提交
309 310 311
  }
  // opt
}
dengyihao's avatar
dengyihao 已提交
312 313
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
314
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
315 316 317 318
  } else {
    tError("fail to dispatch conn to work thread");
  }
}
dengyihao's avatar
dengyihao 已提交
319

dengyihao's avatar
dengyihao 已提交
320
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
321
  // impl later;
dengyihao's avatar
dengyihao 已提交
322
  tTrace("server conn %p prepare to send resp", smsg->pConn);
dengyihao's avatar
dengyihao 已提交
323

dengyihao's avatar
dengyihao 已提交
324
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
325
  SRpcMsg*  pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
326 327 328 329 330
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
331 332

  pHead->secured = pMsg->code == 0 ? 1 : 0;  //
dengyihao's avatar
dengyihao 已提交
333
  pHead->msgType = smsg->pConn->inType + 1;
dengyihao's avatar
dengyihao 已提交
334
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
335 336 337 338 339 340
  // add more info
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
  if (transCompressMsg(msg, len, NULL)) {
    // impl later
  }
dengyihao's avatar
dengyihao 已提交
341 342 343
  tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
         inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
         ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
344

dengyihao's avatar
dengyihao 已提交
345 346 347 348
  pHead->msgLen = htonl(len);
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
349 350

static void uvStartSendRespInternal(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
351 352 353 354 355 356
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
  uv_timer_stop(pConn->pTimer);

dengyihao's avatar
dengyihao 已提交
357
  // pConn->pSrvMsg = smsg;
dengyihao's avatar
dengyihao 已提交
358 359
  // conn->pWriter->data = smsg;
  uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
dengyihao's avatar
dengyihao 已提交
360 361 362 363
}
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
fix bug  
dengyihao 已提交
364
  pConn->ref--;  //
dengyihao's avatar
dengyihao 已提交
365
  if (taosArrayGetSize(pConn->srvMsgs) > 0) {
dengyihao's avatar
dengyihao 已提交
366 367
    tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr),
           ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
368 369 370 371 372
    taosArrayPush(pConn->srvMsgs, &smsg);
    return;
  }
  taosArrayPush(pConn->srvMsgs, &smsg);
  uvStartSendRespInternal(smsg);
dengyihao's avatar
dengyihao 已提交
373 374
  return;
}
dengyihao's avatar
dengyihao 已提交
375
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
376 377 378 379
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
dengyihao's avatar
dengyihao 已提交
380
  free(smsg);
dengyihao's avatar
dengyihao 已提交
381
}
dengyihao's avatar
fix bug  
dengyihao 已提交
382 383 384 385 386 387 388 389 390 391
static void destroyAllConn(SWorkThrdObj* pThrd) {
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
    queue* h = QUEUE_HEAD(&pThrd->conn);
    QUEUE_REMOVE(h);
    QUEUE_INIT(h);

    SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
    destroyConn(c, true);
  }
}
dengyihao's avatar
dengyihao 已提交
392
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
393 394
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
395
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
396 397
  queue         wq;
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
dengyihao 已提交
398 399 400 401
  pthread_mutex_lock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  pthread_mutex_unlock(&item->mtx);
  // pthread_mutex_unlock(&mtx);
dengyihao's avatar
dengyihao 已提交
402 403 404 405

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
406 407 408

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
409
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
410
      continue;
dengyihao's avatar
dengyihao 已提交
411
    }
dengyihao's avatar
dengyihao 已提交
412 413
    if (msg->pConn == NULL) {
      free(msg);
dengyihao's avatar
fix bug  
dengyihao 已提交
414 415

      destroyAllConn(pThrd);
dengyihao's avatar
dengyihao 已提交
416 417

      uv_loop_close(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
418 419 420 421
      uv_stop(pThrd->loop);
    } else {
      uvStartSendResp(msg);
    }
dengyihao's avatar
dengyihao 已提交
422 423
  }
}
dengyihao's avatar
dengyihao 已提交
424 425
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
dengyihao's avatar
fix bug  
dengyihao 已提交
426
  uv_close((uv_handle_t*)&srv->server, NULL);
dengyihao's avatar
dengyihao 已提交
427 428
  uv_stop(srv->loop);
}
dengyihao's avatar
dengyihao 已提交
429

dengyihao's avatar
dengyihao 已提交
430 431 432 433 434 435
static void uvShutDownCb(uv_shutdown_t* req, int status) {
  tDebug("conn failed to shut down: %s", uv_err_name(status));
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
  free(req);
}

dengyihao's avatar
dengyihao 已提交
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450
void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

  uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
    uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));

    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
451

dengyihao's avatar
dengyihao 已提交
452
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
453
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
454 455
  } else {
    uv_close((uv_handle_t*)cli, NULL);
456
    free(cli);
dengyihao's avatar
dengyihao 已提交
457 458 459
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
460
  tTrace("server connection coming");
dengyihao's avatar
dengyihao 已提交
461 462 463 464 465
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
466
    // uv_close((uv_handle_t*)q, NULL);
dengyihao's avatar
dengyihao 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
  free(buf->base);

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
fix bug  
dengyihao 已提交
485
  SSrvConn* pConn = createConn(pThrd);
486

dengyihao's avatar
dengyihao 已提交
487
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
  /* init conn timer*/
  pConn->pTimer = malloc(sizeof(uv_timer_t));
  uv_timer_init(pThrd->loop, pConn->pTimer);
  pConn->pTimer->data = pConn;

  pConn->hostThrd = pThrd;

  // init client handle
  pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

  pConn->pWriter = calloc(1, sizeof(uv_write_t));
  pConn->pWriter->data = pConn;

dengyihao's avatar
dengyihao 已提交
503 504
  transSetConnOption((uv_tcp_t*)pConn->pTcp);

dengyihao's avatar
dengyihao 已提交
505 506 507
  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
508
    tTrace("server conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
509

dengyihao's avatar
dengyihao 已提交
510 511
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
512
      tError("server conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
513
      destroyConn(pConn, true);
dengyihao's avatar
dengyihao 已提交
514
      return;
dengyihao's avatar
dengyihao 已提交
515
    }
dengyihao's avatar
dengyihao 已提交
516 517 518 519 520 521 522 523 524 525

    addrlen = sizeof(pConn->locaddr);
    if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
      tError("server conn %p failed to get local info", pConn);
      destroyConn(pConn, true);
      return;
    }

    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);

dengyihao's avatar
dengyihao 已提交
526
  } else {
dengyihao's avatar
dengyihao 已提交
527
    tDebug("failed to create new connection");
528
    destroyConn(pConn, true);
dengyihao's avatar
dengyihao 已提交
529 530 531 532 533
  }
}

void* acceptThread(void* arg) {
  // opt
dengyihao's avatar
dengyihao 已提交
534
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
535 536 537
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
}
dengyihao's avatar
dengyihao 已提交
538 539
static bool addHandleToWorkloop(void* arg) {
  SWorkThrdObj* pThrd = arg;
dengyihao's avatar
dengyihao 已提交
540
  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
541 542 543
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
544 545

  // SRpcInfo* pRpc = pThrd->shandle;
dengyihao's avatar
dengyihao 已提交
546
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
dengyihao's avatar
dengyihao 已提交
547 548 549 550
  uv_pipe_open(pThrd->pipe, pThrd->fd);

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
551 552
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
553

dengyihao's avatar
fix bug  
dengyihao 已提交
554 555 556
  // conn set
  QUEUE_INIT(&pThrd->conn);

dengyihao's avatar
dengyihao 已提交
557
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb);
dengyihao's avatar
dengyihao 已提交
558
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
559 560 561 562 563 564 565 566 567 568 569 570 571
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
572 573 574 575
  // register an async here to quit server gracefully
  srv->pAcceptAsync = calloc(1, sizeof(uv_async_t));
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
576

dengyihao's avatar
dengyihao 已提交
577
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
578 579 580 581 582
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
583
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
584 585 586 587
    tError("failed to listen: %s", uv_err_name(err));
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
588 589
}
void* workerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
590
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
591
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
592 593 594
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
}

dengyihao's avatar
fix bug  
dengyihao 已提交
595 596 597
static SSrvConn* createConn(void* hThrd) {
  SWorkThrdObj* pThrd = hThrd;

dengyihao's avatar
dengyihao 已提交
598
  SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
dengyihao's avatar
fix bug  
dengyihao 已提交
599 600 601
  QUEUE_INIT(&pConn->queue);

  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
dengyihao's avatar
dengyihao 已提交
602
  pConn->srvMsgs = taosArrayInit(2, sizeof(void*));  //
dengyihao's avatar
dengyihao 已提交
603
  tTrace("conn %p created", pConn);
604
  ++pConn->ref;
dengyihao's avatar
dengyihao 已提交
605 606
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
607

dengyihao's avatar
dengyihao 已提交
608
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
609 610 611
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
fix bug  
dengyihao 已提交
612
  tTrace("server conn %p try to destroy, ref: %d", conn, conn->ref);
dengyihao's avatar
dengyihao 已提交
613
  if (--conn->ref > 0) {
614 615
    return;
  }
dengyihao's avatar
dengyihao 已提交
616
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
617 618 619 620 621

  for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) {
    SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i);
    destroySmsg(msg);
  }
dengyihao's avatar
add UT  
dengyihao 已提交
622
  conn->srvMsgs = taosArrayDestroy(conn->srvMsgs);
dengyihao's avatar
fix bug  
dengyihao 已提交
623
  QUEUE_REMOVE(&conn->queue);
dengyihao's avatar
dengyihao 已提交
624

625
  if (clear) {
dengyihao's avatar
dengyihao 已提交
626
    tTrace("try to destroy conn %p", conn);
dengyihao's avatar
fix bug  
dengyihao 已提交
627
    uv_tcp_close_reset(conn->pTcp, uvDestroyConn);
dengyihao's avatar
dengyihao 已提交
628 629 630
    // uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
    // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
    // uv_unref((uv_handle_t*)conn->pTcp);
dengyihao's avatar
fix bug  
dengyihao 已提交
631
    // uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
632
  }
dengyihao's avatar
dengyihao 已提交
633 634 635
}
static void uvDestroyConn(uv_handle_t* handle) {
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
636
  tDebug("server conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
637
  uv_timer_stop(conn->pTimer);
dengyihao's avatar
fix bug  
dengyihao 已提交
638
  // free(conn->pTimer);
dengyihao's avatar
dengyihao 已提交
639
  free(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
640
  free(conn->pWriter);
641
  free(conn);
dengyihao's avatar
dengyihao 已提交
642
}
dengyihao's avatar
dengyihao 已提交
643
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
dengyihao's avatar
dengyihao 已提交
644
  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
645 646 647 648

  if (pConn->spi && pConn->secured == 0) {
    // add auth part
    pHead->spi = pConn->spi;
dengyihao's avatar
dengyihao 已提交
649
    STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen);
dengyihao's avatar
dengyihao 已提交
650 651 652
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
dengyihao's avatar
dengyihao 已提交
653 654
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
dengyihao's avatar
dengyihao 已提交
655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675
  } else {
    pHead->spi = 0;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
  }

  return msgLen;
}

void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
  SServerObj* srv = calloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
  srv->pThreadObj = (SWorkThrdObj**)calloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*));
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

  for (int i = 0; i < srv->numOfThreads; i++) {
    SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
676
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
677

dengyihao's avatar
dengyihao 已提交
678 679 680
    srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
    int fds[2];
    if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
dengyihao's avatar
dengyihao 已提交
681
      goto End;
dengyihao's avatar
dengyihao 已提交
682 683 684 685
    }
    uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

dengyihao's avatar
dengyihao 已提交
686
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
687 688
    thrd->fd = fds[0];
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
689

dengyihao's avatar
dengyihao 已提交
690 691 692
    if (false == addHandleToWorkloop(thrd)) {
      goto End;
    }
dengyihao's avatar
dengyihao 已提交
693 694 695 696 697 698 699 700 701
    int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
    }
  }
dengyihao's avatar
dengyihao 已提交
702 703 704
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
dengyihao's avatar
dengyihao 已提交
705 706 707 708 709 710 711 712
  int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
    // clear all resource later
  }

  return srv;
dengyihao's avatar
dengyihao 已提交
713 714 715 716 717 718 719 720 721 722 723
End:
  taosCloseServer(srv);
  return NULL;
}

void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
  pthread_join(pThrd->thread, NULL);
  free(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
724 725 726
  transDestroyAsyncPool(pThrd->asyncPool);

  // free(pThrd->workerAsync);
dengyihao's avatar
dengyihao 已提交
727
  free(pThrd);
dengyihao's avatar
dengyihao 已提交
728
}
dengyihao's avatar
dengyihao 已提交
729 730 731 732
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  tDebug("send quit msg to work thread");

dengyihao's avatar
dengyihao 已提交
733
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
734 735
}

dengyihao's avatar
dengyihao 已提交
736 737 738
void taosCloseServer(void* arg) {
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
739
  for (int i = 0; i < srv->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
740
    sendQuitToWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
741
    destroyWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
742
  }
dengyihao's avatar
dengyihao 已提交
743 744 745 746 747 748 749

  tDebug("send quit msg to accept thread");
  uv_async_send(srv->pAcceptAsync);
  pthread_join(srv->thread, NULL);

  free(srv->pThreadObj);
  free(srv->pAcceptAsync);
dengyihao's avatar
dengyihao 已提交
750
  free(srv->loop);
dengyihao's avatar
dengyihao 已提交
751 752 753 754

  for (int i = 0; i < srv->numOfThreads; i++) {
    free(srv->pipe[i]);
  }
dengyihao's avatar
dengyihao 已提交
755
  free(srv->pipe);
dengyihao's avatar
dengyihao 已提交
756

dengyihao's avatar
dengyihao 已提交
757
  free(srv);
dengyihao's avatar
dengyihao 已提交
758
}
dengyihao's avatar
dengyihao 已提交
759 760

void rpcSendResponse(const SRpcMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
761 762 763
  if (pMsg->handle == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
764
  SSrvConn*     pConn = pMsg->handle;
dengyihao's avatar
dengyihao 已提交
765 766
  SWorkThrdObj* pThrd = pConn->hostThrd;

dengyihao's avatar
dengyihao 已提交
767 768 769
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  srvMsg->pConn = pConn;
  srvMsg->msg = *pMsg;
dengyihao's avatar
dengyihao 已提交
770
  tTrace("server conn %p start to send resp", pConn);
dengyihao's avatar
dengyihao 已提交
771
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
772 773
}

dengyihao's avatar
dengyihao 已提交
774
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) {
dengyihao's avatar
dengyihao 已提交
775
  SSrvConn* pConn = thandle;
dengyihao's avatar
dengyihao 已提交
776

dengyihao's avatar
dengyihao 已提交
777 778 779
  struct sockaddr_in addr = pConn->addr;
  pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(addr.sin_port);
dengyihao's avatar
dengyihao 已提交
780 781 782 783 784

  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
785
#endif