transSrv.c 25.9 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20 21 22 23 24 25
typedef struct {
  int       notifyCount;  //
  int       init;         // init or not
  STransMsg msg;
} SSrvRegArg;

dengyihao's avatar
dengyihao 已提交
26
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
27
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
28 29 30
  uv_tcp_t*  pTcp;
  uv_write_t pWriter;
  uv_timer_t pTimer;
dengyihao's avatar
dengyihao 已提交
31 32 33

  queue       queue;
  int         ref;
dengyihao's avatar
dengyihao 已提交
34
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
35
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
36 37 38
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
39
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
40
  STransQueue srvMsgs;
dengyihao's avatar
dengyihao 已提交
41

dengyihao's avatar
dengyihao 已提交
42 43
  SSrvRegArg regArg;
  bool       broken;  // conn broken;
dengyihao's avatar
dengyihao 已提交
44

dengyihao's avatar
dengyihao 已提交
45
  ConnStatus         status;
dengyihao's avatar
dengyihao 已提交
46
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
47
  struct sockaddr_in locaddr;
U
ubuntu 已提交
48

dengyihao's avatar
dengyihao 已提交
49 50 51 52 53 54
  char secured;
  int  spi;
  char info[64];
  char user[TSDB_UNI_LEN];  // user ID for the link
  char secret[TSDB_PASSWORD_LEN];
  char ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
55 56 57
} SSrvConn;

typedef struct SSrvMsg {
dengyihao's avatar
dengyihao 已提交
58 59 60 61
  SSrvConn*     pConn;
  STransMsg     msg;
  queue         q;
  STransMsgType type;
dengyihao's avatar
dengyihao 已提交
62
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
63 64

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
65 66 67 68 69 70
  TdThread      thread;
  uv_pipe_t*    pipe;
  uv_os_fd_t    fd;
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  queue         msg;
wafwerar's avatar
wafwerar 已提交
71
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
72 73 74 75

  queue conn;
  void* pTransInst;
  bool  quit;
dengyihao's avatar
dengyihao 已提交
76 77 78
} SWorkThrdObj;

typedef struct SServerObj {
dengyihao's avatar
dengyihao 已提交
79
  TdThread   thread;
dengyihao's avatar
dengyihao 已提交
80 81 82 83
  uv_tcp_t   server;
  uv_loop_t* loop;

  // work thread info
dengyihao's avatar
dengyihao 已提交
84 85 86
  int            workerIdx;
  int            numOfThreads;
  SWorkThrdObj** pThreadObj;
dengyihao's avatar
dengyihao 已提交
87 88 89 90 91

  uv_pipe_t** pipe;
  uint32_t    ip;
  uint32_t    port;
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
92 93 94 95
} SServerObj;

static const char* notify = "a";

dengyihao's avatar
dengyihao 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
#define CONN_SHOULD_RELEASE(conn, head)                                     \
  do {                                                                      \
    if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) {          \
      conn->status = ConnRelease;                                           \
      transClearBuffer(&conn->readBuf);                                     \
      transFreeMsg(transContFromHead((char*)head));                         \
      tTrace("server conn %p received release request", conn);              \
                                                                            \
      STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
      SSrvMsg*  srvMsg = calloc(1, sizeof(SSrvMsg));                        \
      srvMsg->msg = tmsg;                                                   \
      srvMsg->type = Release;                                               \
      srvMsg->pConn = conn;                                                 \
      if (!transQueuePush(&conn->srvMsgs, srvMsg)) {                        \
        return;                                                             \
      }                                                                     \
      uvStartSendRespInternal(srvMsg);                                      \
      return;                                                               \
    }                                                                       \
dengyihao's avatar
dengyihao 已提交
115
  } while (0)
dengyihao's avatar
dengyihao 已提交
116 117

static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
118
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
119
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
120
static void uvOnTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
121
static void uvOnSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
122
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
123 124 125
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
126
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
127
static void uvShutDownCb(uv_shutdown_t* req, int status);
dengyihao's avatar
dengyihao 已提交
128

dengyihao's avatar
dengyihao 已提交
129
static void uvStartSendRespInternal(SSrvMsg* smsg);
dengyihao's avatar
dengyihao 已提交
130 131
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
132

dengyihao's avatar
dengyihao 已提交
133 134
static void uvNotifyLinkBrokenToApp(SSrvConn* conn);

dengyihao's avatar
dengyihao 已提交
135
static void destroySmsg(SSrvMsg* smsg);
136
// check whether already read complete packet
dengyihao's avatar
fix bug  
dengyihao 已提交
137
static SSrvConn* createConn(void* hThrd);
dengyihao's avatar
dengyihao 已提交
138
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
139

dengyihao's avatar
dengyihao 已提交
140 141
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
142
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
143
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
144
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
dengyihao's avatar
dengyihao 已提交
145
                                                                       uvHandleRegister};
dengyihao's avatar
dengyihao 已提交
146

147
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
148

dengyihao's avatar
dengyihao 已提交
149
// server and worker thread
dengyihao's avatar
dengyihao 已提交
150 151 152
static void* workerThread(void* arg);
static void* acceptThread(void* arg);

dengyihao's avatar
dengyihao 已提交
153 154 155 156
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
157
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
158 159 160
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
161 162 163 164
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
165
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
166
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
167 168
}

dengyihao's avatar
dengyihao 已提交
169 170
static void uvHandleReq(SSrvConn* pConn) {
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
171 172 173 174
  char*        msg = pBuf->buf;
  uint32_t     msgLen = pBuf->len;

  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
175
  if (pHead->secured == 1) {
dengyihao's avatar
dengyihao 已提交
176
    STransUserMsg* uMsg = (STransUserMsg*)((char*)msg + msgLen - sizeof(STransUserMsg));
dengyihao's avatar
dengyihao 已提交
177
    memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
dengyihao's avatar
dengyihao 已提交
178
    memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
dengyihao's avatar
dengyihao 已提交
179
  }
dengyihao's avatar
dengyihao 已提交
180
  pHead->code = htonl(pHead->code);
dengyihao's avatar
dengyihao 已提交
181 182 183
  pHead->msgLen = htonl(pHead->msgLen);
  if (pHead->secured == 1) {
    pHead->msgLen -= sizeof(STransUserMsg);
dengyihao's avatar
dengyihao 已提交
184
  }
dengyihao's avatar
dengyihao 已提交
185

dengyihao's avatar
dengyihao 已提交
186
  CONN_SHOULD_RELEASE(pConn, pHead);
dengyihao's avatar
dengyihao 已提交
187

dengyihao's avatar
dengyihao 已提交
188 189 190 191 192
  STransMsg transMsg;
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = pHead->content;
  transMsg.msgType = pHead->msgType;
  transMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
193
  transMsg.ahandle = (void*)pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
194
  transMsg.handle = NULL;
dengyihao's avatar
dengyihao 已提交
195

dengyihao's avatar
dengyihao 已提交
196
  transClearBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
197
  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
198 199 200 201
  if (pConn->status == ConnNormal) {
    if (pHead->persist == 1) {
      pConn->status = ConnAcquire;
      transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
202
      tDebug("server conn %p acquired by server app", pConn);
dengyihao's avatar
dengyihao 已提交
203 204 205
    }
  }
  if (pConn->status == ConnNormal && pHead->noResp == 0) {
dengyihao's avatar
dengyihao 已提交
206 207
    transRefSrvHandle(pConn);
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
208
           taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
dengyihao's avatar
dengyihao 已提交
209 210
           ntohs(pConn->locaddr.sin_port), transMsg.contLen);
  } else {
dengyihao's avatar
dengyihao 已提交
211
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
dengyihao's avatar
dengyihao 已提交
212 213
           TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
           taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
dengyihao's avatar
dengyihao 已提交
214 215 216 217 218
    // no ref here
  }

  if (pHead->noResp == 0) {
    transMsg.handle = pConn;
dengyihao's avatar
dengyihao 已提交
219
  }
dengyihao's avatar
dengyihao 已提交
220

dengyihao's avatar
dengyihao 已提交
221
  STrans* pTransInst = pConn->pTransInst;
dengyihao's avatar
dengyihao 已提交
222
  (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
dengyihao's avatar
dengyihao 已提交
223
  // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
224 225
}

dengyihao's avatar
dengyihao 已提交
226
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
227
  // opt
dengyihao's avatar
dengyihao 已提交
228 229
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
230 231
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
232
    tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
233
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
234
      tTrace("server conn %p alread read complete packet", conn);
dengyihao's avatar
dengyihao 已提交
235
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
236
    } else {
dengyihao's avatar
dengyihao 已提交
237
      tTrace("server %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
238 239 240
    }
    return;
  }
241 242 243
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
244 245

  tError("server conn %p read error: %s", conn, uv_err_name(nread));
U
ubuntu 已提交
246
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
247
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
248 249 250 251 252 253 254
    if (conn->status == ConnAcquire) {
      if (conn->regArg.init) {
        STrans* pTransInst = conn->pTransInst;
        (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
        memset(&conn->regArg, 0, sizeof(conn->regArg));
      }
    }
dengyihao's avatar
dengyihao 已提交
255
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
256 257 258 259
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->len = 2;
dengyihao's avatar
fix bug  
dengyihao 已提交
260
  buf->base = calloc(1, sizeof(char) * buf->len);
dengyihao's avatar
dengyihao 已提交
261 262 263 264
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
265
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
266
  tError("server conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
267 268
}

dengyihao's avatar
dengyihao 已提交
269
void uvOnSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
270 271
  SSrvConn* conn = req->data;
  transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
272
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
273
    tTrace("server conn %p data already was written on stream", conn);
dengyihao's avatar
dengyihao 已提交
274 275
    if (!transQueueEmpty(&conn->srvMsgs)) {
      SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
276 277 278 279
      if (msg->type == Release && conn->status != ConnNormal) {
        conn->status = ConnNormal;
        transUnrefSrvHandle(conn);
      }
dengyihao's avatar
add UT  
dengyihao 已提交
280 281
      destroySmsg(msg);
      // send second data, just use for push
dengyihao's avatar
dengyihao 已提交
282
      if (!transQueueEmpty(&conn->srvMsgs)) {
dengyihao's avatar
dengyihao 已提交
283
        msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
284 285 286 287 288 289 290 291 292
        if (msg->type == Register && conn->status == ConnAcquire) {
          conn->regArg.notifyCount = 0;
          conn->regArg.init = 1;
          conn->regArg.msg = msg->msg;
          if (conn->broken) {
            STrans* pTransInst = conn->pTransInst;
            (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
            memset(&conn->regArg, 0, sizeof(conn->regArg));
          }
dengyihao's avatar
dengyihao 已提交
293
          transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
294
          tfree(msg);
S
Shengliang 已提交
295
          msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
296 297 298
          if (msg != NULL) {
            uvStartSendRespInternal(msg);
          }
dengyihao's avatar
dengyihao 已提交
299 300 301
        } else {
          uvStartSendRespInternal(msg);
        }
dengyihao's avatar
add UT  
dengyihao 已提交
302
      }
dengyihao's avatar
dengyihao 已提交
303
    }
dengyihao's avatar
dengyihao 已提交
304
  } else {
dengyihao's avatar
dengyihao 已提交
305
    tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
306
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
307
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
308 309
  }
}
dengyihao's avatar
dengyihao 已提交
310 311
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
312
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
313 314 315
  } else {
    tError("fail to dispatch conn to work thread");
  }
dengyihao's avatar
dengyihao 已提交
316
  free(req);
dengyihao's avatar
dengyihao 已提交
317
}
dengyihao's avatar
dengyihao 已提交
318

dengyihao's avatar
dengyihao 已提交
319
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
dengyihao's avatar
dengyihao 已提交
320
  tTrace("server conn %p prepare to send resp", smsg->pConn);
dengyihao's avatar
dengyihao 已提交
321

dengyihao's avatar
formate  
dengyihao 已提交
322 323
  SSrvConn*  pConn = smsg->pConn;
  STransMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
324 325 326 327 328
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
329
  pHead->ahandle = (uint64_t)pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
330

dengyihao's avatar
dengyihao 已提交
331 332 333 334 335 336 337 338 339 340 341
  // pHead->secured = pMsg->code == 0 ? 1 : 0;  //
  if (!pConn->secured) {
    pConn->secured = pMsg->code == 0 ? 1 : 0;
  }
  pHead->secured = pConn->secured;

  if (pConn->status == ConnNormal) {
    pHead->msgType = pConn->inType + 1;
  } else {
    pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType;
  }
dengyihao's avatar
dengyihao 已提交
342
  pHead->release = smsg->type == Release ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
343
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
344

dengyihao's avatar
dengyihao 已提交
345 346
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
347
  tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
348
         taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
dengyihao's avatar
dengyihao 已提交
349
         ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
350
  pHead->msgLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
351

dengyihao's avatar
dengyihao 已提交
352 353 354
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
355 356

static void uvStartSendRespInternal(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
357 358 359 360
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
361
  uv_timer_stop(&pConn->pTimer);
dengyihao's avatar
dengyihao 已提交
362
  uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
dengyihao's avatar
dengyihao 已提交
363 364 365 366
}
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
367 368

  if (pConn->broken == true) {
dengyihao's avatar
dengyihao 已提交
369
    // persist by
dengyihao's avatar
dengyihao 已提交
370 371 372
    transUnrefSrvHandle(pConn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
373 374 375
  if (pConn->status == ConnNormal) {
    transUnrefSrvHandle(pConn);
  }
dengyihao's avatar
dengyihao 已提交
376

dengyihao's avatar
dengyihao 已提交
377
  if (!transQueuePush(&pConn->srvMsgs, smsg)) {
dengyihao's avatar
dengyihao 已提交
378 379 380
    return;
  }
  uvStartSendRespInternal(smsg);
dengyihao's avatar
dengyihao 已提交
381 382
  return;
}
dengyihao's avatar
dengyihao 已提交
383

dengyihao's avatar
dengyihao 已提交
384
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
385 386 387 388
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
dengyihao's avatar
dengyihao 已提交
389
  free(smsg);
dengyihao's avatar
dengyihao 已提交
390
}
dengyihao's avatar
fix bug  
dengyihao 已提交
391 392 393 394 395 396 397
static void destroyAllConn(SWorkThrdObj* pThrd) {
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
    queue* h = QUEUE_HEAD(&pThrd->conn);
    QUEUE_REMOVE(h);
    QUEUE_INIT(h);

    SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
dengyihao's avatar
dengyihao 已提交
398 399 400
    while (T_REF_VAL_GET(c) >= 2) {
      transUnrefSrvHandle(c);
    }
dengyihao's avatar
dengyihao 已提交
401
    transUnrefSrvHandle(c);
dengyihao's avatar
fix bug  
dengyihao 已提交
402 403
  }
}
dengyihao's avatar
dengyihao 已提交
404
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
405 406
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
407
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
408
  queue         wq;
dengyihao's avatar
dengyihao 已提交
409

dengyihao's avatar
dengyihao 已提交
410
  // batch process to avoid to lock/unlock frequently
wafwerar's avatar
wafwerar 已提交
411
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
412
  QUEUE_MOVE(&item->qmsg, &wq);
wafwerar's avatar
wafwerar 已提交
413
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
414 415 416 417

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
418 419 420

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
421
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
422
      continue;
dengyihao's avatar
dengyihao 已提交
423
    }
dengyihao's avatar
dengyihao 已提交
424
    (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
425 426
  }
}
dengyihao's avatar
dengyihao 已提交
427 428
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
dengyihao's avatar
dengyihao 已提交
429
  tDebug("close server port %d", srv->port);
dengyihao's avatar
fix bug  
dengyihao 已提交
430
  uv_close((uv_handle_t*)&srv->server, NULL);
dengyihao's avatar
dengyihao 已提交
431 432
  uv_stop(srv->loop);
}
dengyihao's avatar
dengyihao 已提交
433

dengyihao's avatar
dengyihao 已提交
434
static void uvShutDownCb(uv_shutdown_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
435 436 437
  if (status != 0) {
    tDebug("conn failed to shut down: %s", uv_err_name(status));
  }
dengyihao's avatar
dengyihao 已提交
438 439 440 441
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
  free(req);
}

dengyihao's avatar
dengyihao 已提交
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

  uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
    uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));

    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
457

dengyihao's avatar
dengyihao 已提交
458
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
459
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
460 461
  } else {
    uv_close((uv_handle_t*)cli, NULL);
462
    free(cli);
dengyihao's avatar
dengyihao 已提交
463 464 465
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
466
  tTrace("server connection coming");
dengyihao's avatar
dengyihao 已提交
467 468 469 470 471
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
472
    // uv_close((uv_handle_t*)q, NULL);
dengyihao's avatar
dengyihao 已提交
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
  free(buf->base);

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
fix bug  
dengyihao 已提交
491
  SSrvConn* pConn = createConn(pThrd);
492

dengyihao's avatar
dengyihao 已提交
493
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
494
  /* init conn timer*/
dengyihao's avatar
dengyihao 已提交
495 496
  uv_timer_init(pThrd->loop, &pConn->pTimer);
  pConn->pTimer.data = pConn;
dengyihao's avatar
dengyihao 已提交
497 498 499 500 501 502 503 504

  pConn->hostThrd = pThrd;

  // init client handle
  pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

dengyihao's avatar
dengyihao 已提交
505
  pConn->pWriter.data = pConn;
dengyihao's avatar
dengyihao 已提交
506

dengyihao's avatar
dengyihao 已提交
507 508
  transSetConnOption((uv_tcp_t*)pConn->pTcp);

dengyihao's avatar
dengyihao 已提交
509 510 511
  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
512
    tTrace("server conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
513

dengyihao's avatar
dengyihao 已提交
514 515
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
516
      tError("server conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
517
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
518
      return;
dengyihao's avatar
dengyihao 已提交
519
    }
dengyihao's avatar
dengyihao 已提交
520 521 522 523

    addrlen = sizeof(pConn->locaddr);
    if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
      tError("server conn %p failed to get local info", pConn);
dengyihao's avatar
dengyihao 已提交
524
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
525 526 527
      return;
    }

dengyihao's avatar
dengyihao 已提交
528
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
dengyihao's avatar
dengyihao 已提交
529

dengyihao's avatar
dengyihao 已提交
530
  } else {
dengyihao's avatar
dengyihao 已提交
531
    tDebug("failed to create new connection");
dengyihao's avatar
dengyihao 已提交
532
    transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
533 534 535 536 537
  }
}

void* acceptThread(void* arg) {
  // opt
dengyihao's avatar
dengyihao 已提交
538
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
539 540
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
541 542

  return NULL;
dengyihao's avatar
dengyihao 已提交
543
}
dengyihao's avatar
dengyihao 已提交
544 545
static bool addHandleToWorkloop(void* arg) {
  SWorkThrdObj* pThrd = arg;
dengyihao's avatar
dengyihao 已提交
546
  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
547 548 549
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
550

dengyihao's avatar
dengyihao 已提交
551
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
dengyihao's avatar
dengyihao 已提交
552 553 554 555
  uv_pipe_open(pThrd->pipe, pThrd->fd);

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
556
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
557
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
558

dengyihao's avatar
fix bug  
dengyihao 已提交
559 560 561
  // conn set
  QUEUE_INIT(&pThrd->conn);

dengyihao's avatar
dengyihao 已提交
562
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
dengyihao's avatar
dengyihao 已提交
563
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
564 565 566 567 568 569 570 571 572 573 574 575 576
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
577 578 579 580
  // register an async here to quit server gracefully
  srv->pAcceptAsync = calloc(1, sizeof(uv_async_t));
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
581

dengyihao's avatar
dengyihao 已提交
582
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
583 584 585 586 587
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
588
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
589 590 591 592
    tError("failed to listen: %s", uv_err_name(err));
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
593 594
}
void* workerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
595
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
596
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
597
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
598 599

  return NULL;
dengyihao's avatar
dengyihao 已提交
600 601
}

dengyihao's avatar
fix bug  
dengyihao 已提交
602 603 604
static SSrvConn* createConn(void* hThrd) {
  SWorkThrdObj* pThrd = hThrd;

dengyihao's avatar
dengyihao 已提交
605
  SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
dengyihao's avatar
fix bug  
dengyihao 已提交
606 607 608
  QUEUE_INIT(&pConn->queue);

  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
dengyihao's avatar
dengyihao 已提交
609 610

  transQueueInit(&pConn->srvMsgs, NULL);
dengyihao's avatar
dengyihao 已提交
611

dengyihao's avatar
dengyihao 已提交
612
  memset(&pConn->regArg, 0, sizeof(pConn->regArg));
dengyihao's avatar
dengyihao 已提交
613
  pConn->broken = false;
dengyihao's avatar
dengyihao 已提交
614
  pConn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
615

dengyihao's avatar
dengyihao 已提交
616
  transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
617
  tTrace("server conn %p created", pConn);
dengyihao's avatar
dengyihao 已提交
618 619
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
620

dengyihao's avatar
dengyihao 已提交
621
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
622 623 624
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
625
  transDestroyBuffer(&conn->readBuf);
626
  if (clear) {
dengyihao's avatar
dengyihao 已提交
627
    tTrace("server conn %p to be destroyed", conn);
dengyihao's avatar
dengyihao 已提交
628 629
    uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
    uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
630
  }
dengyihao's avatar
dengyihao 已提交
631 632
}
static void uvDestroyConn(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
633 634 635 636
  SSrvConn* conn = handle->data;
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
637 638
  SWorkThrdObj* thrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
639
  tDebug("server conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
640
  uv_timer_stop(&conn->pTimer);
dengyihao's avatar
dengyihao 已提交
641
  transQueueDestroy(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
642
  QUEUE_REMOVE(&conn->queue);
dengyihao's avatar
dengyihao 已提交
643
  free(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
644
  // free(conn);
dengyihao's avatar
dengyihao 已提交
645

dengyihao's avatar
dengyihao 已提交
646 647
  if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
    uv_loop_close(thrd->loop);
dengyihao's avatar
dengyihao 已提交
648 649
    uv_stop(thrd->loop);
  }
dengyihao's avatar
dengyihao 已提交
650 651
}

U
ubuntu 已提交
652
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
dengyihao's avatar
dengyihao 已提交
653 654 655 656 657 658 659 660 661 662 663 664
  SServerObj* srv = calloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
  srv->pThreadObj = (SWorkThrdObj**)calloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*));
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

  for (int i = 0; i < srv->numOfThreads; i++) {
    SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
665
    thrd->quit = false;
dengyihao's avatar
dengyihao 已提交
666
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
667

dengyihao's avatar
dengyihao 已提交
668 669 670
    srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
    int fds[2];
    if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
dengyihao's avatar
dengyihao 已提交
671
      goto End;
dengyihao's avatar
dengyihao 已提交
672 673 674 675
    }
    uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

dengyihao's avatar
dengyihao 已提交
676
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
677 678
    thrd->fd = fds[0];
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
679

dengyihao's avatar
dengyihao 已提交
680 681 682
    if (false == addHandleToWorkloop(thrd)) {
      goto End;
    }
wafwerar's avatar
wafwerar 已提交
683
    int err = taosThreadCreate(&(thrd->thread), NULL, workerThread, (void*)(thrd));
dengyihao's avatar
dengyihao 已提交
684 685 686 687 688 689 690 691
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
    }
  }
dengyihao's avatar
dengyihao 已提交
692 693 694
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
wafwerar's avatar
wafwerar 已提交
695
  int err = taosThreadCreate(&srv->thread, NULL, acceptThread, (void*)srv);
dengyihao's avatar
dengyihao 已提交
696 697 698 699 700 701 702
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
    // clear all resource later
  }

  return srv;
dengyihao's avatar
dengyihao 已提交
703
End:
U
ubuntu 已提交
704
  transCloseServer(srv);
dengyihao's avatar
dengyihao 已提交
705 706
  return NULL;
}
dengyihao's avatar
dengyihao 已提交
707 708 709 710 711 712 713 714 715 716 717 718 719 720
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
  if (QUEUE_IS_EMPTY(&thrd->conn)) {
    uv_loop_close(thrd->loop);
    uv_stop(thrd->loop);
  } else {
    destroyAllConn(thrd);
    thrd->quit = true;
  }
  free(msg);
}
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
  // release handle to rpc init
  SSrvConn* conn = msg->pConn;
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
721
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
722
      return;
dengyihao's avatar
dengyihao 已提交
723 724 725
    }
    uvStartSendRespInternal(msg);
    return;
dengyihao's avatar
dengyihao 已提交
726 727
  } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
    tDebug("server conn %p already released, ignore release-msg", conn);
dengyihao's avatar
dengyihao 已提交
728
  }
dengyihao's avatar
dengyihao 已提交
729
  destroySmsg(msg);
dengyihao's avatar
dengyihao 已提交
730
}
dengyihao's avatar
dengyihao 已提交
731
void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
732
  // send msg to client
dengyihao's avatar
dengyihao 已提交
733
  tDebug("server conn %p start to send resp (2/2)", msg->pConn);
dengyihao's avatar
dengyihao 已提交
734 735
  uvStartSendResp(msg);
}
dengyihao's avatar
dengyihao 已提交
736 737
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
dengyihao's avatar
dengyihao 已提交
738
  tDebug("server conn %p register brokenlink callback", conn);
dengyihao's avatar
dengyihao 已提交
739
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
740
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
741 742
      return;
    }
dengyihao's avatar
dengyihao 已提交
743
    transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
744 745 746
    conn->regArg.notifyCount = 0;
    conn->regArg.init = 1;
    conn->regArg.msg = msg->msg;
dengyihao's avatar
dengyihao 已提交
747
    tDebug("server conn %p register brokenlink callback succ", conn);
dengyihao's avatar
dengyihao 已提交
748 749 750 751 752 753 754 755 756

    if (conn->broken) {
      STrans* pTransInst = conn->pTransInst;
      (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
      memset(&conn->regArg, 0, sizeof(conn->regArg));
    }
    free(msg);
  }
}
dengyihao's avatar
dengyihao 已提交
757 758 759 760
void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
761
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
762
  free(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
763
  transDestroyAsyncPool(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
764
  free(pThrd);
dengyihao's avatar
dengyihao 已提交
765
}
dengyihao's avatar
dengyihao 已提交
766
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
767 768
  SSrvMsg* msg = calloc(1, sizeof(SSrvMsg));
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
769
  tDebug("server send quit msg to work thread");
dengyihao's avatar
dengyihao 已提交
770
  transSendAsync(pThrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
771 772
}

U
ubuntu 已提交
773
void transCloseServer(void* arg) {
dengyihao's avatar
dengyihao 已提交
774 775
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
776
  for (int i = 0; i < srv->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
777
    sendQuitToWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
778
    destroyWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
779
  }
dengyihao's avatar
dengyihao 已提交
780 781 782

  tDebug("send quit msg to accept thread");
  uv_async_send(srv->pAcceptAsync);
wafwerar's avatar
wafwerar 已提交
783
  taosThreadJoin(srv->thread, NULL);
dengyihao's avatar
dengyihao 已提交
784 785 786

  free(srv->pThreadObj);
  free(srv->pAcceptAsync);
dengyihao's avatar
dengyihao 已提交
787
  free(srv->loop);
dengyihao's avatar
dengyihao 已提交
788 789 790 791

  for (int i = 0; i < srv->numOfThreads; i++) {
    free(srv->pipe[i]);
  }
dengyihao's avatar
dengyihao 已提交
792
  free(srv->pipe);
dengyihao's avatar
dengyihao 已提交
793

dengyihao's avatar
dengyihao 已提交
794
  free(srv);
dengyihao's avatar
dengyihao 已提交
795
}
dengyihao's avatar
dengyihao 已提交
796

dengyihao's avatar
dengyihao 已提交
797 798 799 800 801 802 803 804 805 806 807 808 809 810 811
void transRefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  SSrvConn* conn = handle;

  int ref = T_REF_INC((SSrvConn*)handle);
  UNUSED(ref);
}

void transUnrefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
812
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
813 814 815 816
  if (ref == 0) {
    destroyConn((SSrvConn*)handle, true);
  }
}
dengyihao's avatar
dengyihao 已提交
817 818

void transReleaseSrvHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
819 820 821 822 823 824
  if (handle == NULL) {
    return;
  }
  SSrvConn*     pConn = handle;
  SWorkThrdObj* pThrd = pConn->hostThrd;

dengyihao's avatar
dengyihao 已提交
825
  STransMsg tmsg = {.code = 0, .handle = handle, .ahandle = NULL};
dengyihao's avatar
dengyihao 已提交
826 827 828 829 830 831 832 833

  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  srvMsg->msg = tmsg;
  srvMsg->type = Release;
  srvMsg->pConn = pConn;

  tTrace("server conn %p start to release", pConn);
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
834
}
U
ubuntu 已提交
835
void transSendResponse(const STransMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
836 837 838
  if (pMsg->handle == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
839
  SSrvConn*     pConn = pMsg->handle;
dengyihao's avatar
dengyihao 已提交
840
  SWorkThrdObj* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
841 842 843
  if (pThrd->quit) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
844

dengyihao's avatar
dengyihao 已提交
845 846 847
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  srvMsg->pConn = pConn;
  srvMsg->msg = *pMsg;
dengyihao's avatar
dengyihao 已提交
848
  srvMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
849
  tTrace("server conn %p start to send resp (1/2)", pConn);
dengyihao's avatar
dengyihao 已提交
850
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
851
}
dengyihao's avatar
dengyihao 已提交
852 853 854 855 856 857 858 859 860 861 862
void transRegisterMsg(const STransMsg* msg) {
  if (msg->handle == NULL) {
    return;
  }
  SSrvConn*     pConn = msg->handle;
  SWorkThrdObj* pThrd = pConn->hostThrd;

  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  srvMsg->pConn = pConn;
  srvMsg->msg = *msg;
  srvMsg->type = Register;
dengyihao's avatar
dengyihao 已提交
863
  tTrace("server conn %p start to register brokenlink callback", pConn);
dengyihao's avatar
dengyihao 已提交
864 865
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
}
dengyihao's avatar
formate  
dengyihao 已提交
866 867
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
  SSrvConn*          pConn = thandle;
dengyihao's avatar
dengyihao 已提交
868
  struct sockaddr_in addr = pConn->addr;
U
ubuntu 已提交
869

dengyihao's avatar
dengyihao 已提交
870 871
  pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(addr.sin_port);
dengyihao's avatar
dengyihao 已提交
872 873 874 875
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
876
#endif