transSrv.c 27.7 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20 21 22 23 24 25
typedef struct {
  int       notifyCount;  //
  int       init;         // init or not
  STransMsg msg;
} SSrvRegArg;

dengyihao's avatar
dengyihao 已提交
26
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
27
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
28 29 30
  uv_tcp_t*  pTcp;
  uv_write_t pWriter;
  uv_timer_t pTimer;
dengyihao's avatar
dengyihao 已提交
31 32

  queue       queue;
dengyihao's avatar
dengyihao 已提交
33
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
34
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
35 36 37
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
38
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
39
  STransQueue srvMsgs;
dengyihao's avatar
dengyihao 已提交
40

dengyihao's avatar
dengyihao 已提交
41 42
  SSrvRegArg regArg;
  bool       broken;  // conn broken;
dengyihao's avatar
dengyihao 已提交
43

dengyihao's avatar
dengyihao 已提交
44
  ConnStatus         status;
dengyihao's avatar
dengyihao 已提交
45
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
46
  struct sockaddr_in locaddr;
U
ubuntu 已提交
47

dengyihao's avatar
dengyihao 已提交
48 49 50 51 52
  int  spi;
  char info[64];
  char user[TSDB_UNI_LEN];  // user ID for the link
  char secret[TSDB_PASSWORD_LEN];
  char ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
53 54 55
} SSrvConn;

typedef struct SSrvMsg {
dengyihao's avatar
dengyihao 已提交
56 57 58 59
  SSrvConn*     pConn;
  STransMsg     msg;
  queue         q;
  STransMsgType type;
dengyihao's avatar
dengyihao 已提交
60
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
61 62

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
63 64 65 66 67 68
  TdThread      thread;
  uv_pipe_t*    pipe;
  uv_os_fd_t    fd;
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  queue         msg;
wafwerar's avatar
wafwerar 已提交
69
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
70 71 72 73

  queue conn;
  void* pTransInst;
  bool  quit;
dengyihao's avatar
dengyihao 已提交
74 75 76
} SWorkThrdObj;

typedef struct SServerObj {
dengyihao's avatar
dengyihao 已提交
77
  TdThread   thread;
dengyihao's avatar
dengyihao 已提交
78 79 80 81
  uv_tcp_t   server;
  uv_loop_t* loop;

  // work thread info
dengyihao's avatar
dengyihao 已提交
82 83 84
  int            workerIdx;
  int            numOfThreads;
  SWorkThrdObj** pThreadObj;
dengyihao's avatar
dengyihao 已提交
85 86 87 88 89

  uv_pipe_t** pipe;
  uint32_t    ip;
  uint32_t    port;
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
90 91 92 93 94
} SServerObj;

static const char* notify = "a";

static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
95
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
96
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
97
static void uvOnTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
98
static void uvOnSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
99
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
100 101 102
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
103
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
104
static void uvShutDownCb(uv_shutdown_t* req, int status);
dengyihao's avatar
dengyihao 已提交
105 106 107 108 109 110 111

/*
 * time-consuming task throwed into BG work thread
 */
static void uvWorkDoTask(uv_work_t* req);
static void uvWorkAfterTask(uv_work_t* req, int status);

dengyihao's avatar
dengyihao 已提交
112 113
static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
114

dengyihao's avatar
dengyihao 已提交
115
static void uvStartSendRespInternal(SSrvMsg* smsg);
dengyihao's avatar
dengyihao 已提交
116 117
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
118

dengyihao's avatar
dengyihao 已提交
119 120
static void uvNotifyLinkBrokenToApp(SSrvConn* conn);

dengyihao's avatar
dengyihao 已提交
121
static void destroySmsg(SSrvMsg* smsg);
122
// check whether already read complete packet
dengyihao's avatar
fix bug  
dengyihao 已提交
123
static SSrvConn* createConn(void* hThrd);
dengyihao's avatar
dengyihao 已提交
124
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
125

dengyihao's avatar
dengyihao 已提交
126 127
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
128
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
129
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
130 131
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
                                                                       uvHandleRegister};
dengyihao's avatar
dengyihao 已提交
132

133
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
134

dengyihao's avatar
dengyihao 已提交
135
// server and worker thread
136 137
static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
dengyihao's avatar
dengyihao 已提交
138

dengyihao's avatar
dengyihao 已提交
139 140 141 142
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
#define CONN_SHOULD_RELEASE(conn, head)                                     \
  do {                                                                      \
    if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) {          \
      conn->status = ConnRelease;                                           \
      transClearBuffer(&conn->readBuf);                                     \
      transFreeMsg(transContFromHead((char*)head));                         \
      tTrace("server conn %p received release request", conn);              \
                                                                            \
      STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
      SSrvMsg*  srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));              \
      srvMsg->msg = tmsg;                                                   \
      srvMsg->type = Release;                                               \
      srvMsg->pConn = conn;                                                 \
      if (!transQueuePush(&conn->srvMsgs, srvMsg)) {                        \
        return;                                                             \
      }                                                                     \
      uvStartSendRespInternal(srvMsg);                                      \
      return;                                                               \
    }                                                                       \
  } while (0)

#define SRV_RELEASE_UV(loop)       \
  do {                             \
    uv_walk(loop, uvWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);  \
    uv_loop_close(loop);           \
  } while (0);

dengyihao's avatar
dengyihao 已提交
171
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
172 173 174
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
175 176 177 178
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
179
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
180
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
181 182
}

dengyihao's avatar
dengyihao 已提交
183 184
static void uvHandleReq(SSrvConn* pConn) {
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
185 186 187 188
  char*        msg = pBuf->buf;
  uint32_t     msgLen = pBuf->len;

  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
189
  pHead->code = htonl(pHead->code);
dengyihao's avatar
dengyihao 已提交
190
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
191
  memcpy(pConn->user, pHead->user, strlen(pHead->user));
dengyihao's avatar
dengyihao 已提交
192

dengyihao's avatar
dengyihao 已提交
193 194 195 196 197 198 199
  // TODO(dengyihao): time-consuming task throwed into BG Thread
  //  uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
  //  wreq->data = pConn;
  //  uv_read_stop((uv_stream_t*)pConn->pTcp);
  //  transRefSrvHandle(pConn);
  //  uv_queue_work(((SWorkThrdObj*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);

dengyihao's avatar
dengyihao 已提交
200
  CONN_SHOULD_RELEASE(pConn, pHead);
dengyihao's avatar
dengyihao 已提交
201

dengyihao's avatar
dengyihao 已提交
202 203 204 205 206
  STransMsg transMsg;
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = pHead->content;
  transMsg.msgType = pHead->msgType;
  transMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
207
  transMsg.ahandle = (void*)pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
208
  transMsg.handle = NULL;
dengyihao's avatar
dengyihao 已提交
209

dengyihao's avatar
dengyihao 已提交
210
  // transDestroyBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
211
  transClearBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
212
  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
213 214 215 216
  if (pConn->status == ConnNormal) {
    if (pHead->persist == 1) {
      pConn->status = ConnAcquire;
      transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
217
      tDebug("server conn %p acquired by server app", pConn);
dengyihao's avatar
dengyihao 已提交
218 219 220
    }
  }
  if (pConn->status == ConnNormal && pHead->noResp == 0) {
dengyihao's avatar
dengyihao 已提交
221
    transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
222 223 224
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
           taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
           ntohs(pConn->locaddr.sin_port), transMsg.contLen);
dengyihao's avatar
dengyihao 已提交
225
  } else {
dengyihao's avatar
dengyihao 已提交
226 227 228
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
           TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
           taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
dengyihao's avatar
dengyihao 已提交
229 230 231
    // no ref here
  }

dengyihao's avatar
dengyihao 已提交
232 233 234 235 236
  // if pHead->noResp = 1,
  // 1. server application should not send resp on handle
  // 2. once send out data, cli conn released to conn pool immediately
  // 3. not mixed with persist
  transMsg.handle = pConn;
dengyihao's avatar
dengyihao 已提交
237

dengyihao's avatar
dengyihao 已提交
238
  STrans* pTransInst = pConn->pTransInst;
dengyihao's avatar
dengyihao 已提交
239
  (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
dengyihao's avatar
dengyihao 已提交
240
  // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
241 242
}

dengyihao's avatar
dengyihao 已提交
243
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
244
  // opt
dengyihao's avatar
dengyihao 已提交
245 246
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
247 248
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
249
    tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
250
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
251
      tTrace("server conn %p alread read complete packet", conn);
dengyihao's avatar
dengyihao 已提交
252
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
253
    } else {
dengyihao's avatar
dengyihao 已提交
254
      tTrace("server %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
255 256 257
    }
    return;
  }
258 259 260
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
261 262

  tError("server conn %p read error: %s", conn, uv_err_name(nread));
U
ubuntu 已提交
263
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
264
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
265 266
    if (conn->status == ConnAcquire) {
      if (conn->regArg.init) {
dengyihao's avatar
dengyihao 已提交
267
        tTrace("server conn %p broken, notify server app", conn);
dengyihao's avatar
dengyihao 已提交
268 269 270 271 272
        STrans* pTransInst = conn->pTransInst;
        (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
        memset(&conn->regArg, 0, sizeof(conn->regArg));
      }
    }
dengyihao's avatar
dengyihao 已提交
273
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
274 275 276 277
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->len = 2;
wafwerar's avatar
wafwerar 已提交
278
  buf->base = taosMemoryCalloc(1, sizeof(char) * buf->len);
dengyihao's avatar
dengyihao 已提交
279 280 281 282
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
283
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
284
  tError("server conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
285 286
}

dengyihao's avatar
dengyihao 已提交
287
void uvOnSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
288
  SSrvConn* conn = req->data;
dengyihao's avatar
dengyihao 已提交
289
  // transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
290
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
291
    tTrace("server conn %p data already was written on stream", conn);
dengyihao's avatar
dengyihao 已提交
292 293
    if (!transQueueEmpty(&conn->srvMsgs)) {
      SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
294 295 296 297
      if (msg->type == Release && conn->status != ConnNormal) {
        conn->status = ConnNormal;
        transUnrefSrvHandle(conn);
      }
dengyihao's avatar
add UT  
dengyihao 已提交
298 299
      destroySmsg(msg);
      // send second data, just use for push
dengyihao's avatar
dengyihao 已提交
300
      if (!transQueueEmpty(&conn->srvMsgs)) {
dengyihao's avatar
dengyihao 已提交
301
        msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
302 303 304 305 306 307 308 309 310
        if (msg->type == Register && conn->status == ConnAcquire) {
          conn->regArg.notifyCount = 0;
          conn->regArg.init = 1;
          conn->regArg.msg = msg->msg;
          if (conn->broken) {
            STrans* pTransInst = conn->pTransInst;
            (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
            memset(&conn->regArg, 0, sizeof(conn->regArg));
          }
dengyihao's avatar
dengyihao 已提交
311
          transQueuePop(&conn->srvMsgs);
wafwerar's avatar
wafwerar 已提交
312
          taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
313 314

          msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
315 316 317
          if (msg != NULL) {
            uvStartSendRespInternal(msg);
          }
dengyihao's avatar
dengyihao 已提交
318 319 320
        } else {
          uvStartSendRespInternal(msg);
        }
dengyihao's avatar
add UT  
dengyihao 已提交
321
      }
dengyihao's avatar
dengyihao 已提交
322
    }
dengyihao's avatar
dengyihao 已提交
323
  } else {
dengyihao's avatar
dengyihao 已提交
324
    tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
325
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
326
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
327 328
  }
}
dengyihao's avatar
dengyihao 已提交
329 330
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
331
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
332 333 334
  } else {
    tError("fail to dispatch conn to work thread");
  }
dengyihao's avatar
dengyihao 已提交
335 336
  uv_close((uv_handle_t*)req->data, uvFreeCb);
  // taosMemoryFree(req->data);
wafwerar's avatar
wafwerar 已提交
337
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
338
}
dengyihao's avatar
dengyihao 已提交
339

dengyihao's avatar
dengyihao 已提交
340
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
dengyihao's avatar
dengyihao 已提交
341
  tTrace("server conn %p prepare to send resp", smsg->pConn);
dengyihao's avatar
dengyihao 已提交
342

dengyihao's avatar
formate  
dengyihao 已提交
343 344
  SSrvConn*  pConn = smsg->pConn;
  STransMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
345 346 347 348 349
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
350
  pHead->ahandle = (uint64_t)pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
351

dengyihao's avatar
dengyihao 已提交
352 353 354 355 356
  if (pConn->status == ConnNormal) {
    pHead->msgType = pConn->inType + 1;
  } else {
    pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType;
  }
dengyihao's avatar
dengyihao 已提交
357
  pHead->release = smsg->type == Release ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
358
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
359

dengyihao's avatar
dengyihao 已提交
360 361
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
362 363 364
  tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
         taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
         ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
365
  pHead->msgLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
366

dengyihao's avatar
dengyihao 已提交
367 368 369
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
370 371

static void uvStartSendRespInternal(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
372 373 374 375
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
376
  // uv_timer_stop(&pConn->pTimer);
dengyihao's avatar
dengyihao 已提交
377
  uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
dengyihao's avatar
dengyihao 已提交
378 379 380 381
}
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
382 383

  if (pConn->broken == true) {
dengyihao's avatar
dengyihao 已提交
384
    // persist by
dengyihao's avatar
dengyihao 已提交
385 386 387
    transUnrefSrvHandle(pConn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
388 389 390
  if (pConn->status == ConnNormal) {
    transUnrefSrvHandle(pConn);
  }
dengyihao's avatar
dengyihao 已提交
391

dengyihao's avatar
dengyihao 已提交
392
  if (!transQueuePush(&pConn->srvMsgs, smsg)) {
dengyihao's avatar
dengyihao 已提交
393 394 395
    return;
  }
  uvStartSendRespInternal(smsg);
dengyihao's avatar
dengyihao 已提交
396 397
  return;
}
dengyihao's avatar
dengyihao 已提交
398

dengyihao's avatar
dengyihao 已提交
399
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
400 401 402 403
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
wafwerar's avatar
wafwerar 已提交
404
  taosMemoryFree(smsg);
dengyihao's avatar
dengyihao 已提交
405
}
dengyihao's avatar
fix bug  
dengyihao 已提交
406 407 408 409 410 411 412
static void destroyAllConn(SWorkThrdObj* pThrd) {
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
    queue* h = QUEUE_HEAD(&pThrd->conn);
    QUEUE_REMOVE(h);
    QUEUE_INIT(h);

    SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
dengyihao's avatar
dengyihao 已提交
413 414 415
    while (T_REF_VAL_GET(c) >= 2) {
      transUnrefSrvHandle(c);
    }
dengyihao's avatar
dengyihao 已提交
416
    transUnrefSrvHandle(c);
dengyihao's avatar
fix bug  
dengyihao 已提交
417 418
  }
}
dengyihao's avatar
dengyihao 已提交
419
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
420 421
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
422
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
423
  queue         wq;
dengyihao's avatar
dengyihao 已提交
424

dengyihao's avatar
dengyihao 已提交
425
  // batch process to avoid to lock/unlock frequently
wafwerar's avatar
wafwerar 已提交
426
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
427
  QUEUE_MOVE(&item->qmsg, &wq);
wafwerar's avatar
wafwerar 已提交
428
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
429 430 431 432

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
433 434 435

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
436
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
437
      continue;
dengyihao's avatar
dengyihao 已提交
438
    }
dengyihao's avatar
dengyihao 已提交
439
    (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
440 441
  }
}
dengyihao's avatar
dengyihao 已提交
442 443 444 445 446
static void uvWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}
dengyihao's avatar
dengyihao 已提交
447 448 449 450
static void uvFreeCb(uv_handle_t* handle) {
  //
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
451

dengyihao's avatar
dengyihao 已提交
452 453
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
dengyihao's avatar
dengyihao 已提交
454
  tDebug("close server port %d", srv->port);
dengyihao's avatar
dengyihao 已提交
455
  uv_walk(srv->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
456
}
dengyihao's avatar
dengyihao 已提交
457

dengyihao's avatar
dengyihao 已提交
458
static void uvShutDownCb(uv_shutdown_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
459 460 461
  if (status != 0) {
    tDebug("conn failed to shut down: %s", uv_err_name(status));
  }
dengyihao's avatar
dengyihao 已提交
462
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
wafwerar's avatar
wafwerar 已提交
463
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
464 465
}

dengyihao's avatar
dengyihao 已提交
466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
static void uvWorkDoTask(uv_work_t* req) {
  // doing time-consumeing task
  // only auth conn currently, add more func later
  tTrace("server conn %p start to be processed in BG Thread", req->data);
  return;
}

static void uvWorkAfterTask(uv_work_t* req, int status) {
  if (status != 0) {
    tTrace("server conn %p failed to processed ", req->data);
  }
  // Done time-consumeing task
  // add more func later
  // this func called in main loop
  tTrace("server conn %p already processed ", req->data);
  taosMemoryFree(req);
}

dengyihao's avatar
dengyihao 已提交
484 485 486 487 488 489
void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

wafwerar's avatar
wafwerar 已提交
490
  uv_tcp_t* cli = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
491 492 493
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
wafwerar's avatar
wafwerar 已提交
494
    uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
495
    wr->data = cli;
dengyihao's avatar
dengyihao 已提交
496 497 498
    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
499

dengyihao's avatar
dengyihao 已提交
500
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
501

dengyihao's avatar
dengyihao 已提交
502
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
503 504 505 506 507
  } else {
    uv_close((uv_handle_t*)cli, NULL);
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
508
  tTrace("server connection coming");
dengyihao's avatar
dengyihao 已提交
509 510 511 512 513
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
dengyihao's avatar
dengyihao 已提交
514 515 516 517
    tError("failed to create connect: %p", q);
    taosMemoryFree(buf->base);
    uv_close((uv_handle_t*)q, NULL);
    // taosMemoryFree(q);
dengyihao's avatar
dengyihao 已提交
518 519 520 521 522
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
wafwerar's avatar
wafwerar 已提交
523
  taosMemoryFree(buf->base);
dengyihao's avatar
dengyihao 已提交
524 525 526 527 528 529 530 531 532 533 534 535

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
fix bug  
dengyihao 已提交
536
  SSrvConn* pConn = createConn(pThrd);
537

dengyihao's avatar
dengyihao 已提交
538
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
539
  /* init conn timer*/
dengyihao's avatar
dengyihao 已提交
540 541
  // uv_timer_init(pThrd->loop, &pConn->pTimer);
  // pConn->pTimer.data = pConn;
dengyihao's avatar
dengyihao 已提交
542 543 544 545

  pConn->hostThrd = pThrd;

  // init client handle
wafwerar's avatar
wafwerar 已提交
546
  pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
547 548 549
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

dengyihao's avatar
dengyihao 已提交
550
  pConn->pWriter.data = pConn;
dengyihao's avatar
dengyihao 已提交
551

dengyihao's avatar
dengyihao 已提交
552 553
  transSetConnOption((uv_tcp_t*)pConn->pTcp);

dengyihao's avatar
dengyihao 已提交
554 555 556
  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
557
    tTrace("server conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
558

dengyihao's avatar
dengyihao 已提交
559 560
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
561
      tError("server conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
562
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
563
      return;
dengyihao's avatar
dengyihao 已提交
564
    }
dengyihao's avatar
dengyihao 已提交
565 566 567 568

    addrlen = sizeof(pConn->locaddr);
    if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
      tError("server conn %p failed to get local info", pConn);
dengyihao's avatar
dengyihao 已提交
569
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
570 571 572
      return;
    }

dengyihao's avatar
dengyihao 已提交
573
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
dengyihao's avatar
dengyihao 已提交
574

dengyihao's avatar
dengyihao 已提交
575
  } else {
dengyihao's avatar
dengyihao 已提交
576
    tDebug("failed to create new connection");
dengyihao's avatar
dengyihao 已提交
577
    transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
578 579 580
  }
}

581
void* transAcceptThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
582
  // opt
dengyihao's avatar
dengyihao 已提交
583
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
584 585
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
586 587

  return NULL;
dengyihao's avatar
dengyihao 已提交
588
}
dengyihao's avatar
dengyihao 已提交
589 590
static bool addHandleToWorkloop(void* arg) {
  SWorkThrdObj* pThrd = arg;
wafwerar's avatar
wafwerar 已提交
591
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
592 593 594
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
595

dengyihao's avatar
dengyihao 已提交
596
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
dengyihao's avatar
dengyihao 已提交
597 598 599 600
  uv_pipe_open(pThrd->pipe, pThrd->fd);

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
601
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
602
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
603

dengyihao's avatar
fix bug  
dengyihao 已提交
604 605 606
  // conn set
  QUEUE_INIT(&pThrd->conn);

dengyihao's avatar
dengyihao 已提交
607
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
dengyihao's avatar
dengyihao 已提交
608
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
609 610 611 612 613 614 615 616 617 618 619 620 621
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
622
  // register an async here to quit server gracefully
wafwerar's avatar
wafwerar 已提交
623
  srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t));
dengyihao's avatar
dengyihao 已提交
624 625
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
626

dengyihao's avatar
dengyihao 已提交
627
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
628 629 630 631 632
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
633
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
634 635 636 637
    tError("failed to listen: %s", uv_err_name(err));
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
638
}
639
void* transWorkerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
640
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
641
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
642
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
643 644

  return NULL;
dengyihao's avatar
dengyihao 已提交
645 646
}

dengyihao's avatar
fix bug  
dengyihao 已提交
647 648 649
static SSrvConn* createConn(void* hThrd) {
  SWorkThrdObj* pThrd = hThrd;

wafwerar's avatar
wafwerar 已提交
650
  SSrvConn* pConn = (SSrvConn*)taosMemoryCalloc(1, sizeof(SSrvConn));
dengyihao's avatar
fix bug  
dengyihao 已提交
651 652 653
  QUEUE_INIT(&pConn->queue);

  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
dengyihao's avatar
dengyihao 已提交
654 655

  transQueueInit(&pConn->srvMsgs, NULL);
dengyihao's avatar
dengyihao 已提交
656

dengyihao's avatar
dengyihao 已提交
657
  memset(&pConn->regArg, 0, sizeof(pConn->regArg));
dengyihao's avatar
dengyihao 已提交
658
  pConn->broken = false;
dengyihao's avatar
dengyihao 已提交
659
  pConn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
660

dengyihao's avatar
dengyihao 已提交
661
  transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
662
  tTrace("server conn %p created", pConn);
dengyihao's avatar
dengyihao 已提交
663 664
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
665

dengyihao's avatar
dengyihao 已提交
666
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
667 668 669
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
670
  transDestroyBuffer(&conn->readBuf);
671
  if (clear) {
dengyihao's avatar
dengyihao 已提交
672
    tTrace("server conn %p to be destroyed", conn);
wafwerar's avatar
wafwerar 已提交
673
    uv_shutdown_t* req = taosMemoryMalloc(sizeof(uv_shutdown_t));
dengyihao's avatar
dengyihao 已提交
674
    uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
675
  }
dengyihao's avatar
dengyihao 已提交
676 677
}
static void uvDestroyConn(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
678 679 680 681
  SSrvConn* conn = handle->data;
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
682 683
  SWorkThrdObj* thrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
684
  tDebug("server conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
685
  // uv_timer_stop(&conn->pTimer);
dengyihao's avatar
dengyihao 已提交
686
  transQueueDestroy(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
687
  QUEUE_REMOVE(&conn->queue);
wafwerar's avatar
wafwerar 已提交
688
  taosMemoryFree(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
689
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
690

dengyihao's avatar
dengyihao 已提交
691
  if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
692
    tTrace("work thread quit");
dengyihao's avatar
dengyihao 已提交
693
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
694
  }
dengyihao's avatar
dengyihao 已提交
695 696
}

U
ubuntu 已提交
697
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
698 699
  SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
700 701
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
wafwerar's avatar
wafwerar 已提交
702 703
  srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
dengyihao's avatar
dengyihao 已提交
704 705 706 707 708
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
709
    SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
710
    thrd->quit = false;
dengyihao's avatar
dengyihao 已提交
711
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
712

wafwerar's avatar
wafwerar 已提交
713
    srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
dengyihao's avatar
dengyihao 已提交
714 715
    int fds[2];
    if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
dengyihao's avatar
dengyihao 已提交
716
      goto End;
dengyihao's avatar
dengyihao 已提交
717 718 719 720
    }
    uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

dengyihao's avatar
dengyihao 已提交
721
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
722 723
    thrd->fd = fds[0];
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
724

dengyihao's avatar
dengyihao 已提交
725 726 727
    if (false == addHandleToWorkloop(thrd)) {
      goto End;
    }
728
    int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
dengyihao's avatar
dengyihao 已提交
729 730 731 732 733 734 735 736
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
    }
  }
dengyihao's avatar
dengyihao 已提交
737 738 739
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
740
  int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
dengyihao's avatar
dengyihao 已提交
741 742 743 744 745 746 747
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
    // clear all resource later
  }

  return srv;
dengyihao's avatar
dengyihao 已提交
748
End:
U
ubuntu 已提交
749
  transCloseServer(srv);
dengyihao's avatar
dengyihao 已提交
750 751
  return NULL;
}
dengyihao's avatar
dengyihao 已提交
752
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
753
  thrd->quit = true;
dengyihao's avatar
dengyihao 已提交
754
  if (QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
755
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
756 757 758
  } else {
    destroyAllConn(thrd);
  }
wafwerar's avatar
wafwerar 已提交
759
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
760 761 762 763 764
}
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
  // release handle to rpc init
  SSrvConn* conn = msg->pConn;
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
765
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
766
      return;
dengyihao's avatar
dengyihao 已提交
767 768 769
    }
    uvStartSendRespInternal(msg);
    return;
dengyihao's avatar
dengyihao 已提交
770 771
  } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
    tDebug("server conn %p already released, ignore release-msg", conn);
dengyihao's avatar
dengyihao 已提交
772
  }
dengyihao's avatar
dengyihao 已提交
773
  destroySmsg(msg);
dengyihao's avatar
dengyihao 已提交
774
}
dengyihao's avatar
dengyihao 已提交
775
void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
776
  // send msg to client
dengyihao's avatar
dengyihao 已提交
777
  tDebug("server conn %p start to send resp (2/2)", msg->pConn);
dengyihao's avatar
dengyihao 已提交
778 779
  uvStartSendResp(msg);
}
dengyihao's avatar
dengyihao 已提交
780 781
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
dengyihao's avatar
dengyihao 已提交
782
  tDebug("server conn %p register brokenlink callback", conn);
dengyihao's avatar
dengyihao 已提交
783
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
784
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
785 786
      return;
    }
dengyihao's avatar
dengyihao 已提交
787
    transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
788 789 790
    conn->regArg.notifyCount = 0;
    conn->regArg.init = 1;
    conn->regArg.msg = msg->msg;
dengyihao's avatar
dengyihao 已提交
791
    tDebug("server conn %p register brokenlink callback succ", conn);
dengyihao's avatar
dengyihao 已提交
792 793 794 795 796 797

    if (conn->broken) {
      STrans* pTransInst = conn->pTransInst;
      (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
      memset(&conn->regArg, 0, sizeof(conn->regArg));
    }
wafwerar's avatar
wafwerar 已提交
798
    taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
799 800
  }
}
dengyihao's avatar
dengyihao 已提交
801 802 803 804
void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
805
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
806
  SRV_RELEASE_UV(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
807
  transDestroyAsyncPool(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
808
  taosMemoryFree(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
809
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
810
}
dengyihao's avatar
dengyihao 已提交
811
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
wafwerar's avatar
wafwerar 已提交
812
  SSrvMsg* msg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
813
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
814
  tDebug("server send quit msg to work thread");
dengyihao's avatar
dengyihao 已提交
815
  transSendAsync(pThrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
816 817
}

U
ubuntu 已提交
818
void transCloseServer(void* arg) {
dengyihao's avatar
dengyihao 已提交
819 820
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
821 822 823

  tDebug("send quit msg to accept thread");
  uv_async_send(srv->pAcceptAsync);
wafwerar's avatar
wafwerar 已提交
824
  taosThreadJoin(srv->thread, NULL);
dengyihao's avatar
dengyihao 已提交
825

dengyihao's avatar
dengyihao 已提交
826
  SRV_RELEASE_UV(srv->loop);
dengyihao's avatar
dengyihao 已提交
827

dengyihao's avatar
dengyihao 已提交
828 829 830 831 832
  for (int i = 0; i < srv->numOfThreads; i++) {
    sendQuitToWorkThrd(srv->pThreadObj[i]);
    destroyWorkThrd(srv->pThreadObj[i]);
  }

wafwerar's avatar
wafwerar 已提交
833 834 835
  taosMemoryFree(srv->pThreadObj);
  taosMemoryFree(srv->pAcceptAsync);
  taosMemoryFree(srv->loop);
dengyihao's avatar
dengyihao 已提交
836 837

  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
838
    taosMemoryFree(srv->pipe[i]);
dengyihao's avatar
dengyihao 已提交
839
  }
wafwerar's avatar
wafwerar 已提交
840
  taosMemoryFree(srv->pipe);
dengyihao's avatar
dengyihao 已提交
841

wafwerar's avatar
wafwerar 已提交
842
  taosMemoryFree(srv);
dengyihao's avatar
dengyihao 已提交
843
}
dengyihao's avatar
dengyihao 已提交
844

dengyihao's avatar
dengyihao 已提交
845 846 847 848 849
void transRefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
850
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
851 852 853 854 855 856 857
}

void transUnrefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
858
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
859 860 861 862
  if (ref == 0) {
    destroyConn((SSrvConn*)handle, true);
  }
}
dengyihao's avatar
dengyihao 已提交
863 864

void transReleaseSrvHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
865 866 867 868 869 870
  if (handle == NULL) {
    return;
  }
  SSrvConn*     pConn = handle;
  SWorkThrdObj* pThrd = pConn->hostThrd;

dengyihao's avatar
dengyihao 已提交
871
  STransMsg tmsg = {.code = 0, .handle = handle, .ahandle = NULL};
dengyihao's avatar
dengyihao 已提交
872

wafwerar's avatar
wafwerar 已提交
873
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
874 875 876 877 878 879
  srvMsg->msg = tmsg;
  srvMsg->type = Release;
  srvMsg->pConn = pConn;

  tTrace("server conn %p start to release", pConn);
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
880
}
U
ubuntu 已提交
881
void transSendResponse(const STransMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
882 883 884
  if (pMsg->handle == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
885
  SSrvConn*     pConn = pMsg->handle;
dengyihao's avatar
dengyihao 已提交
886
  SWorkThrdObj* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
887 888 889
  if (pThrd->quit) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
890

wafwerar's avatar
wafwerar 已提交
891
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
892 893
  srvMsg->pConn = pConn;
  srvMsg->msg = *pMsg;
dengyihao's avatar
dengyihao 已提交
894
  srvMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
895
  tTrace("server conn %p start to send resp (1/2)", pConn);
dengyihao's avatar
dengyihao 已提交
896
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
897
}
dengyihao's avatar
dengyihao 已提交
898 899 900 901 902 903 904
void transRegisterMsg(const STransMsg* msg) {
  if (msg->handle == NULL) {
    return;
  }
  SSrvConn*     pConn = msg->handle;
  SWorkThrdObj* pThrd = pConn->hostThrd;

wafwerar's avatar
wafwerar 已提交
905
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
906 907 908
  srvMsg->pConn = pConn;
  srvMsg->msg = *msg;
  srvMsg->type = Register;
dengyihao's avatar
dengyihao 已提交
909
  tTrace("server conn %p start to register brokenlink callback", pConn);
dengyihao's avatar
dengyihao 已提交
910 911
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
}
dengyihao's avatar
formate  
dengyihao 已提交
912 913
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
  SSrvConn*          pConn = thandle;
dengyihao's avatar
dengyihao 已提交
914
  struct sockaddr_in addr = pConn->addr;
U
ubuntu 已提交
915

dengyihao's avatar
dengyihao 已提交
916 917
  pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(addr.sin_port);
dengyihao's avatar
dengyihao 已提交
918 919 920 921
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
922
#endif