transSrv.c 24.5 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
21
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
22 23 24
  uv_tcp_t*  pTcp;
  uv_write_t pWriter;
  uv_timer_t pTimer;
dengyihao's avatar
dengyihao 已提交
25 26 27

  queue       queue;
  int         ref;
dengyihao's avatar
dengyihao 已提交
28
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
29
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
30 31 32
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
33
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
34
  SArray*     srvMsgs;
dengyihao's avatar
dengyihao 已提交
35 36

  bool broken;  // conn broken;
dengyihao's avatar
dengyihao 已提交
37

dengyihao's avatar
dengyihao 已提交
38
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
39
  struct sockaddr_in locaddr;
U
ubuntu 已提交
40

dengyihao's avatar
dengyihao 已提交
41 42 43 44 45 46
  char secured;
  int  spi;
  char info[64];
  char user[TSDB_UNI_LEN];  // user ID for the link
  char secret[TSDB_PASSWORD_LEN];
  char ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
47 48 49 50
} SSrvConn;

typedef struct SSrvMsg {
  SSrvConn* pConn;
dengyihao's avatar
formate  
dengyihao 已提交
51
  STransMsg msg;
dengyihao's avatar
dengyihao 已提交
52 53
  queue     q;
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
54 55

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
56 57
  pthread_t   thread;
  uv_pipe_t*  pipe;
58
  uv_os_fd_t  fd;
dengyihao's avatar
dengyihao 已提交
59 60
  uv_loop_t*  loop;
  SAsyncPool* asyncPool;
dengyihao's avatar
dengyihao 已提交
61

dengyihao's avatar
dengyihao 已提交
62 63
  queue           msg;
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
64 65 66 67

  queue conn;
  void* pTransInst;
  bool  quit;
dengyihao's avatar
dengyihao 已提交
68 69 70
} SWorkThrdObj;

typedef struct SServerObj {
dengyihao's avatar
dengyihao 已提交
71 72 73 74 75
  pthread_t  thread;
  uv_tcp_t   server;
  uv_loop_t* loop;

  // work thread info
dengyihao's avatar
dengyihao 已提交
76 77 78
  int            workerIdx;
  int            numOfThreads;
  SWorkThrdObj** pThreadObj;
dengyihao's avatar
dengyihao 已提交
79 80 81 82 83

  uv_pipe_t** pipe;
  uint32_t    ip;
  uint32_t    port;
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
84 85 86 87 88
} SServerObj;

static const char* notify = "a";

// refactor later
dengyihao's avatar
dengyihao 已提交
89
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
90

dengyihao's avatar
dengyihao 已提交
91
static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
92 93

static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
94
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
95
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
96
static void uvOnTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
97
static void uvOnSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
98
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
99 100 101
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
102
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
103
static void uvShutDownCb(uv_shutdown_t* req, int status);
dengyihao's avatar
dengyihao 已提交
104

dengyihao's avatar
dengyihao 已提交
105
static void uvStartSendRespInternal(SSrvMsg* smsg);
dengyihao's avatar
dengyihao 已提交
106 107
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
108

dengyihao's avatar
dengyihao 已提交
109 110
static void uvNotifyLinkBrokenToApp(SSrvConn* conn);

dengyihao's avatar
dengyihao 已提交
111
static void destroySmsg(SSrvMsg* smsg);
112
// check whether already read complete packet
dengyihao's avatar
fix bug  
dengyihao 已提交
113
static SSrvConn* createConn(void* hThrd);
dengyihao's avatar
dengyihao 已提交
114
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
115

116
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
117

dengyihao's avatar
dengyihao 已提交
118
// server and worker thread
dengyihao's avatar
dengyihao 已提交
119 120 121
static void* workerThread(void* arg);
static void* acceptThread(void* arg);

dengyihao's avatar
dengyihao 已提交
122 123 124 125
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
126
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
127 128 129
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
130 131
}

dengyihao's avatar
dengyihao 已提交
132
static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
dengyihao's avatar
dengyihao 已提交
133 134 135
  STransMsgHead* pHead = (STransMsgHead*)msg;

  int code = 0;
dengyihao's avatar
dengyihao 已提交
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167

  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
    // secured link, or no authentication
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
    // tTrace("%s, secured link, no auth is required", pConn->info);
    return 0;
  }

  if (!rpcIsReq(pHead->msgType)) {
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
        code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
        code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
      return 0;
    }
  }

  code = 0;
  if (pHead->spi == pConn->spi) {
    // authentication
    SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest));

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
    } else {
dengyihao's avatar
dengyihao 已提交
168
      if (transAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
dengyihao's avatar
dengyihao 已提交
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
        // tDebug("%s, authentication failed, msg discarded", pConn->info);
        code = TSDB_CODE_RPC_AUTH_FAILURE;
      } else {
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
        if (!rpcIsReq(pHead->msgType)) pConn->secured = 1;  // link is secured for client
        // tTrace("%s, message is authenticated", pConn->info);
      }
    }
  } else {
    tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
  }

  return code;
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
187
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
188
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
189 190
}

dengyihao's avatar
dengyihao 已提交
191
static void uvHandleReq(SSrvConn* pConn) {
dengyihao's avatar
dengyihao 已提交
192 193
  SRecvInfo    info;
  SRecvInfo*   p = &info;
dengyihao's avatar
dengyihao 已提交
194
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
195
  p->msg = pBuf->buf;
dengyihao's avatar
dengyihao 已提交
196 197 198
  p->msgLen = pBuf->len;
  p->ip = 0;
  p->port = 0;
dengyihao's avatar
dengyihao 已提交
199
  p->shandle = pConn->pTransInst;  //
dengyihao's avatar
dengyihao 已提交
200 201 202
  p->thandle = pConn;
  p->chandle = NULL;

dengyihao's avatar
dengyihao 已提交
203
  STransMsgHead* pHead = (STransMsgHead*)p->msg;
dengyihao's avatar
dengyihao 已提交
204
  if (pHead->secured == 1) {
205
    STransUserMsg* uMsg = (STransUserMsg*)((char*)p->msg + p->msgLen - sizeof(STransUserMsg));
dengyihao's avatar
dengyihao 已提交
206
    memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
dengyihao's avatar
dengyihao 已提交
207
    memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
dengyihao's avatar
dengyihao 已提交
208
  }
dengyihao's avatar
dengyihao 已提交
209 210
  pHead->code = htonl(pHead->code);

dengyihao's avatar
dengyihao 已提交
211 212 213
  int32_t dlen = 0;
  if (transDecompressMsg(NULL, 0, NULL)) {
    // add compress later
U
ubuntu 已提交
214
    // pHead = rpcDecompresSTransMsg(pHead);
dengyihao's avatar
dengyihao 已提交
215
  } else {
dengyihao's avatar
dengyihao 已提交
216
    pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
217 218 219
    if (pHead->secured == 1) {
      pHead->msgLen -= sizeof(STransUserMsg);
    }
dengyihao's avatar
dengyihao 已提交
220
    //
dengyihao's avatar
dengyihao 已提交
221
  }
dengyihao's avatar
dengyihao 已提交
222

dengyihao's avatar
dengyihao 已提交
223 224 225 226 227 228
  STransMsg transMsg;
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = pHead->content;
  transMsg.msgType = pHead->msgType;
  transMsg.code = pHead->code;
  transMsg.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
229
  transMsg.handle = NULL;
dengyihao's avatar
dengyihao 已提交
230

dengyihao's avatar
dengyihao 已提交
231
  transClearBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
232 233
  pConn->inType = pHead->msgType;

dengyihao's avatar
dengyihao 已提交
234 235 236 237
  if (pHead->resflag == 0) {
    transRefSrvHandle(pConn);
    transMsg.handle = pConn;
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
238
           taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
dengyihao's avatar
dengyihao 已提交
239 240 241
           ntohs(pConn->locaddr.sin_port), transMsg.contLen);
  } else {
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn,
242 243
           TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
           taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen);
dengyihao's avatar
dengyihao 已提交
244
  }
dengyihao's avatar
dengyihao 已提交
245 246

  STrans* pTransInst = (STrans*)p->shandle;
dengyihao's avatar
dengyihao 已提交
247
  (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
dengyihao's avatar
dengyihao 已提交
248
  // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
249 250 251 252
  // auth
  // validate msg type
}

dengyihao's avatar
dengyihao 已提交
253
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
254
  // opt
dengyihao's avatar
dengyihao 已提交
255 256
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
257 258
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
259
    tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
260
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
261
      tTrace("server conn %p alread read complete packet", conn);
dengyihao's avatar
dengyihao 已提交
262
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
263
    } else {
dengyihao's avatar
dengyihao 已提交
264
      tTrace("server %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
265 266 267
    }
    return;
  }
268 269 270
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
271 272

  tError("server conn %p read error: %s", conn, uv_err_name(nread));
U
ubuntu 已提交
273
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
274
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
275
    uvNotifyLinkBrokenToApp(conn);
dengyihao's avatar
dengyihao 已提交
276

dengyihao's avatar
dengyihao 已提交
277 278
    // STrans* pTransInst = conn->pTransInst;
    // if (pTransInst->efp != NULL && (pTransInst->efp)(NULL, conn->inType)) {
dengyihao's avatar
dengyihao 已提交
279
    //}
dengyihao's avatar
dengyihao 已提交
280
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
281 282 283 284
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->len = 2;
dengyihao's avatar
fix bug  
dengyihao 已提交
285
  buf->base = calloc(1, sizeof(char) * buf->len);
dengyihao's avatar
dengyihao 已提交
286 287 288 289
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
290
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
291
  tError("server conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
292 293
}

dengyihao's avatar
dengyihao 已提交
294
void uvOnSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
295 296
  SSrvConn* conn = req->data;
  transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
297
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
298
    tTrace("server conn %p data already was written on stream", conn);
dengyihao's avatar
add UT  
dengyihao 已提交
299 300 301
    if (conn->srvMsgs != NULL) {
      assert(taosArrayGetSize(conn->srvMsgs) >= 1);
      SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
302
      tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
dengyihao's avatar
add UT  
dengyihao 已提交
303 304 305 306 307
      taosArrayRemove(conn->srvMsgs, 0);
      destroySmsg(msg);

      // send second data, just use for push
      if (taosArrayGetSize(conn->srvMsgs) > 0) {
dengyihao's avatar
dengyihao 已提交
308
        tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
dengyihao's avatar
add UT  
dengyihao 已提交
309 310 311
        msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
        uvStartSendRespInternal(msg);
      }
dengyihao's avatar
dengyihao 已提交
312
    }
dengyihao's avatar
dengyihao 已提交
313
  } else {
dengyihao's avatar
dengyihao 已提交
314
    tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
315 316
    conn->broken = false;
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
317 318
  }
}
dengyihao's avatar
dengyihao 已提交
319 320
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
321
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
322 323 324
  } else {
    tError("fail to dispatch conn to work thread");
  }
dengyihao's avatar
dengyihao 已提交
325
  free(req);
dengyihao's avatar
dengyihao 已提交
326
}
dengyihao's avatar
dengyihao 已提交
327

dengyihao's avatar
dengyihao 已提交
328
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
329
  // impl later;
dengyihao's avatar
dengyihao 已提交
330
  tTrace("server conn %p prepare to send resp", smsg->pConn);
dengyihao's avatar
dengyihao 已提交
331

dengyihao's avatar
formate  
dengyihao 已提交
332 333
  SSrvConn*  pConn = smsg->pConn;
  STransMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
334 335 336 337 338
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
339 340

  pHead->secured = pMsg->code == 0 ? 1 : 0;  //
dengyihao's avatar
dengyihao 已提交
341
  pHead->msgType = smsg->pConn->inType + 1;
dengyihao's avatar
dengyihao 已提交
342
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
343 344 345 346 347 348
  // add more info
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
  if (transCompressMsg(msg, len, NULL)) {
    // impl later
  }
dengyihao's avatar
dengyihao 已提交
349
  tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
350
         taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
dengyihao's avatar
dengyihao 已提交
351
         ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
352

dengyihao's avatar
dengyihao 已提交
353 354 355 356
  pHead->msgLen = htonl(len);
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
357 358

static void uvStartSendRespInternal(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
359 360 361 362
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
363
  uv_timer_stop(&pConn->pTimer);
dengyihao's avatar
dengyihao 已提交
364
  uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
dengyihao's avatar
dengyihao 已提交
365 366 367 368
}
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
369 370 371 372 373 374 375

  if (pConn->broken == true) {
    transUnrefSrvHandle(pConn);
    return;
  }
  transUnrefSrvHandle(pConn);

dengyihao's avatar
dengyihao 已提交
376
  if (taosArrayGetSize(pConn->srvMsgs) > 0) {
377 378
    tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr),
           ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
379 380 381 382 383
    taosArrayPush(pConn->srvMsgs, &smsg);
    return;
  }
  taosArrayPush(pConn->srvMsgs, &smsg);
  uvStartSendRespInternal(smsg);
dengyihao's avatar
dengyihao 已提交
384 385
  return;
}
dengyihao's avatar
dengyihao 已提交
386 387 388 389 390 391 392 393 394 395 396

static void uvNotifyLinkBrokenToApp(SSrvConn* conn) {
  STrans* pTransInst = conn->pTransInst;
  if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) {
    STransMsg transMsg = {0};
    transMsg.msgType = conn->inType;
    transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
    // transRefSrvHandle(conn);
    (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0);
  }
}
dengyihao's avatar
dengyihao 已提交
397
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
398 399 400 401
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
dengyihao's avatar
dengyihao 已提交
402
  free(smsg);
dengyihao's avatar
dengyihao 已提交
403
}
dengyihao's avatar
fix bug  
dengyihao 已提交
404 405 406 407 408 409 410
static void destroyAllConn(SWorkThrdObj* pThrd) {
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
    queue* h = QUEUE_HEAD(&pThrd->conn);
    QUEUE_REMOVE(h);
    QUEUE_INIT(h);

    SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
dengyihao's avatar
dengyihao 已提交
411
    transUnrefSrvHandle(c);
dengyihao's avatar
fix bug  
dengyihao 已提交
412 413
  }
}
dengyihao's avatar
dengyihao 已提交
414
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
415 416
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
417
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
418
  queue         wq;
dengyihao's avatar
dengyihao 已提交
419

dengyihao's avatar
dengyihao 已提交
420
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
dengyihao 已提交
421 422 423
  pthread_mutex_lock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  pthread_mutex_unlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
424 425 426 427

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
428 429 430

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
431
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
432
      continue;
dengyihao's avatar
dengyihao 已提交
433
    }
dengyihao's avatar
dengyihao 已提交
434 435
    if (msg->pConn == NULL) {
      free(msg);
dengyihao's avatar
dengyihao 已提交
436 437 438 439 440 441
      bool noConn = QUEUE_IS_EMPTY(&pThrd->conn);
      if (noConn == true) {
        uv_loop_close(pThrd->loop);
        uv_stop(pThrd->loop);
      } else {
        destroyAllConn(pThrd);
dengyihao's avatar
dengyihao 已提交
442 443
        // uv_loop_close(pThrd->loop);
        pThrd->quit = true;
dengyihao's avatar
dengyihao 已提交
444
      }
dengyihao's avatar
dengyihao 已提交
445 446 447
    } else {
      uvStartSendResp(msg);
    }
dengyihao's avatar
dengyihao 已提交
448 449
  }
}
dengyihao's avatar
dengyihao 已提交
450 451
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
dengyihao's avatar
dengyihao 已提交
452
  tDebug("close server port %d", srv->port);
dengyihao's avatar
fix bug  
dengyihao 已提交
453
  uv_close((uv_handle_t*)&srv->server, NULL);
dengyihao's avatar
dengyihao 已提交
454 455
  uv_stop(srv->loop);
}
dengyihao's avatar
dengyihao 已提交
456

dengyihao's avatar
dengyihao 已提交
457
static void uvShutDownCb(uv_shutdown_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
458 459 460
  if (status != 0) {
    tDebug("conn failed to shut down: %s", uv_err_name(status));
  }
dengyihao's avatar
dengyihao 已提交
461 462 463 464
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
  free(req);
}

dengyihao's avatar
dengyihao 已提交
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

  uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
    uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));

    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
480

dengyihao's avatar
dengyihao 已提交
481
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
482
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
483 484
  } else {
    uv_close((uv_handle_t*)cli, NULL);
485
    free(cli);
dengyihao's avatar
dengyihao 已提交
486 487 488
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
489
  tTrace("server connection coming");
dengyihao's avatar
dengyihao 已提交
490 491 492 493 494
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
495
    // uv_close((uv_handle_t*)q, NULL);
dengyihao's avatar
dengyihao 已提交
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
  free(buf->base);

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
fix bug  
dengyihao 已提交
514
  SSrvConn* pConn = createConn(pThrd);
515

dengyihao's avatar
dengyihao 已提交
516
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
517
  /* init conn timer*/
dengyihao's avatar
dengyihao 已提交
518 519
  uv_timer_init(pThrd->loop, &pConn->pTimer);
  pConn->pTimer.data = pConn;
dengyihao's avatar
dengyihao 已提交
520 521 522 523 524 525 526 527

  pConn->hostThrd = pThrd;

  // init client handle
  pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

dengyihao's avatar
dengyihao 已提交
528
  pConn->pWriter.data = pConn;
dengyihao's avatar
dengyihao 已提交
529

dengyihao's avatar
dengyihao 已提交
530 531
  transSetConnOption((uv_tcp_t*)pConn->pTcp);

dengyihao's avatar
dengyihao 已提交
532 533 534
  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
535
    tTrace("server conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
536

dengyihao's avatar
dengyihao 已提交
537 538
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
539
      tError("server conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
540
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
541
      return;
dengyihao's avatar
dengyihao 已提交
542
    }
dengyihao's avatar
dengyihao 已提交
543 544 545 546

    addrlen = sizeof(pConn->locaddr);
    if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
      tError("server conn %p failed to get local info", pConn);
dengyihao's avatar
dengyihao 已提交
547
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
548 549 550
      return;
    }

dengyihao's avatar
dengyihao 已提交
551
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
dengyihao's avatar
dengyihao 已提交
552

dengyihao's avatar
dengyihao 已提交
553
  } else {
dengyihao's avatar
dengyihao 已提交
554
    tDebug("failed to create new connection");
dengyihao's avatar
dengyihao 已提交
555
    transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
556 557 558 559 560
  }
}

void* acceptThread(void* arg) {
  // opt
dengyihao's avatar
dengyihao 已提交
561
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
562 563
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
564 565

  return NULL;
dengyihao's avatar
dengyihao 已提交
566
}
dengyihao's avatar
dengyihao 已提交
567 568
static bool addHandleToWorkloop(void* arg) {
  SWorkThrdObj* pThrd = arg;
dengyihao's avatar
dengyihao 已提交
569
  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
570 571 572
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
573

dengyihao's avatar
dengyihao 已提交
574
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
dengyihao's avatar
dengyihao 已提交
575 576 577 578
  uv_pipe_open(pThrd->pipe, pThrd->fd);

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
579 580
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
581

dengyihao's avatar
fix bug  
dengyihao 已提交
582 583 584
  // conn set
  QUEUE_INIT(&pThrd->conn);

dengyihao's avatar
dengyihao 已提交
585
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb);
dengyihao's avatar
dengyihao 已提交
586
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
587 588 589 590 591 592 593 594 595 596 597 598 599
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
600 601 602 603
  // register an async here to quit server gracefully
  srv->pAcceptAsync = calloc(1, sizeof(uv_async_t));
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
604

dengyihao's avatar
dengyihao 已提交
605
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
606 607 608 609 610
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
611
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
612 613 614 615
    tError("failed to listen: %s", uv_err_name(err));
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
616 617
}
void* workerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
618
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
619
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
620
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
621 622

  return NULL;
dengyihao's avatar
dengyihao 已提交
623 624
}

dengyihao's avatar
fix bug  
dengyihao 已提交
625 626 627
static SSrvConn* createConn(void* hThrd) {
  SWorkThrdObj* pThrd = hThrd;

dengyihao's avatar
dengyihao 已提交
628
  SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
dengyihao's avatar
fix bug  
dengyihao 已提交
629 630 631
  QUEUE_INIT(&pConn->queue);

  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
dengyihao's avatar
dengyihao 已提交
632
  pConn->srvMsgs = taosArrayInit(2, sizeof(void*));  //
dengyihao's avatar
dengyihao 已提交
633
  tTrace("conn %p created", pConn);
dengyihao's avatar
dengyihao 已提交
634 635

  pConn->broken = false;
dengyihao's avatar
dengyihao 已提交
636

dengyihao's avatar
dengyihao 已提交
637
  transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
638 639
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
640

dengyihao's avatar
dengyihao 已提交
641
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
642 643 644
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
645
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
646 647 648 649 650

  for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) {
    SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i);
    destroySmsg(msg);
  }
dengyihao's avatar
add UT  
dengyihao 已提交
651
  conn->srvMsgs = taosArrayDestroy(conn->srvMsgs);
652
  if (clear) {
dengyihao's avatar
dengyihao 已提交
653
    tTrace("try to destroy conn %p", conn);
dengyihao's avatar
dengyihao 已提交
654 655
    uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
    uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
656
  }
dengyihao's avatar
dengyihao 已提交
657 658
}
static void uvDestroyConn(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
659 660 661 662
  SSrvConn* conn = handle->data;
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
663 664
  SWorkThrdObj* thrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
665
  tDebug("server conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
666
  uv_timer_stop(&conn->pTimer);
dengyihao's avatar
dengyihao 已提交
667
  QUEUE_REMOVE(&conn->queue);
dengyihao's avatar
dengyihao 已提交
668
  free(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
669
  // free(conn);
dengyihao's avatar
dengyihao 已提交
670

dengyihao's avatar
dengyihao 已提交
671 672
  if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
    uv_loop_close(thrd->loop);
dengyihao's avatar
dengyihao 已提交
673 674
    uv_stop(thrd->loop);
  }
dengyihao's avatar
dengyihao 已提交
675
}
dengyihao's avatar
dengyihao 已提交
676
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
dengyihao's avatar
dengyihao 已提交
677
  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
678 679 680 681

  if (pConn->spi && pConn->secured == 0) {
    // add auth part
    pHead->spi = pConn->spi;
dengyihao's avatar
dengyihao 已提交
682
    STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen);
dengyihao's avatar
dengyihao 已提交
683 684 685
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
dengyihao's avatar
dengyihao 已提交
686 687
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
dengyihao's avatar
dengyihao 已提交
688 689 690 691 692 693 694 695
  } else {
    pHead->spi = 0;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
  }

  return msgLen;
}

U
ubuntu 已提交
696
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
dengyihao's avatar
dengyihao 已提交
697 698 699 700 701 702 703 704 705 706 707 708
  SServerObj* srv = calloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
  srv->pThreadObj = (SWorkThrdObj**)calloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*));
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

  for (int i = 0; i < srv->numOfThreads; i++) {
    SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
709
    thrd->quit = false;
dengyihao's avatar
dengyihao 已提交
710
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
711

dengyihao's avatar
dengyihao 已提交
712 713 714
    srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
    int fds[2];
    if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
dengyihao's avatar
dengyihao 已提交
715
      goto End;
dengyihao's avatar
dengyihao 已提交
716 717 718 719
    }
    uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

dengyihao's avatar
dengyihao 已提交
720
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
721 722
    thrd->fd = fds[0];
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
723

dengyihao's avatar
dengyihao 已提交
724 725 726
    if (false == addHandleToWorkloop(thrd)) {
      goto End;
    }
dengyihao's avatar
dengyihao 已提交
727 728 729 730 731 732 733 734 735
    int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
    }
  }
dengyihao's avatar
dengyihao 已提交
736 737 738
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
dengyihao's avatar
dengyihao 已提交
739 740 741 742 743 744 745 746
  int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
    // clear all resource later
  }

  return srv;
dengyihao's avatar
dengyihao 已提交
747
End:
U
ubuntu 已提交
748
  transCloseServer(srv);
dengyihao's avatar
dengyihao 已提交
749 750 751 752 753 754 755 756 757
  return NULL;
}

void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
  pthread_join(pThrd->thread, NULL);
  free(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
758
  transDestroyAsyncPool(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
759
  free(pThrd);
dengyihao's avatar
dengyihao 已提交
760
}
dengyihao's avatar
dengyihao 已提交
761 762
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
763
  tDebug("server send quit msg to work thread");
dengyihao's avatar
dengyihao 已提交
764

dengyihao's avatar
dengyihao 已提交
765
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
766 767
}

U
ubuntu 已提交
768
void transCloseServer(void* arg) {
dengyihao's avatar
dengyihao 已提交
769 770
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
771
  for (int i = 0; i < srv->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
772
    sendQuitToWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
773
    destroyWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
774
  }
dengyihao's avatar
dengyihao 已提交
775 776 777 778 779 780 781

  tDebug("send quit msg to accept thread");
  uv_async_send(srv->pAcceptAsync);
  pthread_join(srv->thread, NULL);

  free(srv->pThreadObj);
  free(srv->pAcceptAsync);
dengyihao's avatar
dengyihao 已提交
782
  free(srv->loop);
dengyihao's avatar
dengyihao 已提交
783 784 785 786

  for (int i = 0; i < srv->numOfThreads; i++) {
    free(srv->pipe[i]);
  }
dengyihao's avatar
dengyihao 已提交
787
  free(srv->pipe);
dengyihao's avatar
dengyihao 已提交
788

dengyihao's avatar
dengyihao 已提交
789
  free(srv);
dengyihao's avatar
dengyihao 已提交
790
}
dengyihao's avatar
dengyihao 已提交
791

dengyihao's avatar
dengyihao 已提交
792 793 794 795 796 797 798 799 800 801 802 803 804 805 806
void transRefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  SSrvConn* conn = handle;

  int ref = T_REF_INC((SSrvConn*)handle);
  UNUSED(ref);
}

void transUnrefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
807
  tDebug("handle %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
808 809 810 811 812 813

  if (ref == 0) {
    destroyConn((SSrvConn*)handle, true);
  }
  // unref srv handle
}
dengyihao's avatar
dengyihao 已提交
814 815 816 817 818

void transReleaseSrvHandle(void* handle) {
  // do nothing currently
  //
}
U
ubuntu 已提交
819
void transSendResponse(const STransMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
820 821 822
  if (pMsg->handle == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
823
  SSrvConn*     pConn = pMsg->handle;
dengyihao's avatar
dengyihao 已提交
824 825
  SWorkThrdObj* pThrd = pConn->hostThrd;

dengyihao's avatar
dengyihao 已提交
826 827 828
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  srvMsg->pConn = pConn;
  srvMsg->msg = *pMsg;
dengyihao's avatar
dengyihao 已提交
829
  tTrace("server conn %p start to send resp", pConn);
dengyihao's avatar
dengyihao 已提交
830
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
831
}
dengyihao's avatar
formate  
dengyihao 已提交
832 833
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
  SSrvConn*          pConn = thandle;
dengyihao's avatar
dengyihao 已提交
834
  struct sockaddr_in addr = pConn->addr;
U
ubuntu 已提交
835

dengyihao's avatar
dengyihao 已提交
836 837
  pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(addr.sin_port);
dengyihao's avatar
dengyihao 已提交
838 839 840 841
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
842
#endif