transSrv.c 27.3 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20 21 22 23 24 25
typedef struct {
  int       notifyCount;  //
  int       init;         // init or not
  STransMsg msg;
} SSrvRegArg;

dengyihao's avatar
dengyihao 已提交
26
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
27
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
28 29 30
  uv_tcp_t*  pTcp;
  uv_write_t pWriter;
  uv_timer_t pTimer;
dengyihao's avatar
dengyihao 已提交
31 32 33

  queue       queue;
  int         ref;
dengyihao's avatar
dengyihao 已提交
34
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
35
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
36 37 38
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
39
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
40
  STransQueue srvMsgs;
dengyihao's avatar
dengyihao 已提交
41

dengyihao's avatar
dengyihao 已提交
42 43
  SSrvRegArg regArg;
  bool       broken;  // conn broken;
dengyihao's avatar
dengyihao 已提交
44

dengyihao's avatar
dengyihao 已提交
45
  ConnStatus         status;
dengyihao's avatar
dengyihao 已提交
46
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
47
  struct sockaddr_in locaddr;
U
ubuntu 已提交
48

dengyihao's avatar
dengyihao 已提交
49 50 51 52 53 54
  char secured;
  int  spi;
  char info[64];
  char user[TSDB_UNI_LEN];  // user ID for the link
  char secret[TSDB_PASSWORD_LEN];
  char ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
55 56 57
} SSrvConn;

typedef struct SSrvMsg {
dengyihao's avatar
dengyihao 已提交
58 59 60 61
  SSrvConn*     pConn;
  STransMsg     msg;
  queue         q;
  STransMsgType type;
dengyihao's avatar
dengyihao 已提交
62
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
63 64

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
65 66 67 68 69 70
  TdThread      thread;
  uv_pipe_t*    pipe;
  uv_os_fd_t    fd;
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  queue         msg;
wafwerar's avatar
wafwerar 已提交
71
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
72 73 74 75

  queue conn;
  void* pTransInst;
  bool  quit;
dengyihao's avatar
dengyihao 已提交
76 77 78
} SWorkThrdObj;

typedef struct SServerObj {
dengyihao's avatar
dengyihao 已提交
79
  TdThread   thread;
dengyihao's avatar
dengyihao 已提交
80 81 82 83
  uv_tcp_t   server;
  uv_loop_t* loop;

  // work thread info
dengyihao's avatar
dengyihao 已提交
84 85 86
  int            workerIdx;
  int            numOfThreads;
  SWorkThrdObj** pThreadObj;
dengyihao's avatar
dengyihao 已提交
87 88 89 90 91

  uv_pipe_t** pipe;
  uint32_t    ip;
  uint32_t    port;
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
92 93 94 95 96
} SServerObj;

static const char* notify = "a";

static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
97
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
98
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
99
static void uvOnTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
100
static void uvOnSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
101
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
102 103 104
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
105
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
106
static void uvShutDownCb(uv_shutdown_t* req, int status);
dengyihao's avatar
dengyihao 已提交
107 108
static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
109

dengyihao's avatar
dengyihao 已提交
110
static void uvStartSendRespInternal(SSrvMsg* smsg);
dengyihao's avatar
dengyihao 已提交
111 112
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
113

dengyihao's avatar
dengyihao 已提交
114 115
static void uvNotifyLinkBrokenToApp(SSrvConn* conn);

dengyihao's avatar
dengyihao 已提交
116
static void destroySmsg(SSrvMsg* smsg);
117
// check whether already read complete packet
dengyihao's avatar
fix bug  
dengyihao 已提交
118
static SSrvConn* createConn(void* hThrd);
dengyihao's avatar
dengyihao 已提交
119
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
120

dengyihao's avatar
dengyihao 已提交
121 122
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
123
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
124
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
125 126
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
                                                                       uvHandleRegister};
dengyihao's avatar
dengyihao 已提交
127

128
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
129

dengyihao's avatar
dengyihao 已提交
130
// server and worker thread
131 132
static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
dengyihao's avatar
dengyihao 已提交
133

dengyihao's avatar
dengyihao 已提交
134 135 136 137
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
#define CONN_SHOULD_RELEASE(conn, head)                                     \
  do {                                                                      \
    if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) {          \
      conn->status = ConnRelease;                                           \
      transClearBuffer(&conn->readBuf);                                     \
      transFreeMsg(transContFromHead((char*)head));                         \
      tTrace("server conn %p received release request", conn);              \
                                                                            \
      STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
      SSrvMsg*  srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));              \
      srvMsg->msg = tmsg;                                                   \
      srvMsg->type = Release;                                               \
      srvMsg->pConn = conn;                                                 \
      if (!transQueuePush(&conn->srvMsgs, srvMsg)) {                        \
        return;                                                             \
      }                                                                     \
      uvStartSendRespInternal(srvMsg);                                      \
      return;                                                               \
    }                                                                       \
  } while (0)

#define SRV_RELEASE_UV(loop)       \
  do {                             \
    uv_walk(loop, uvWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);  \
    uv_loop_close(loop);           \
  } while (0);

dengyihao's avatar
dengyihao 已提交
166
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
167 168 169
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
170 171 172 173
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
174
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
175
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
176 177
}

dengyihao's avatar
dengyihao 已提交
178 179
static void uvHandleReq(SSrvConn* pConn) {
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
180 181 182 183
  char*        msg = pBuf->buf;
  uint32_t     msgLen = pBuf->len;

  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
184
  if (pHead->secured == 1) {
dengyihao's avatar
dengyihao 已提交
185
    STransUserMsg* uMsg = (STransUserMsg*)((char*)msg + msgLen - sizeof(STransUserMsg));
dengyihao's avatar
dengyihao 已提交
186
    memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
dengyihao's avatar
dengyihao 已提交
187
    memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
dengyihao's avatar
dengyihao 已提交
188
  }
dengyihao's avatar
dengyihao 已提交
189
  pHead->code = htonl(pHead->code);
dengyihao's avatar
dengyihao 已提交
190 191 192
  pHead->msgLen = htonl(pHead->msgLen);
  if (pHead->secured == 1) {
    pHead->msgLen -= sizeof(STransUserMsg);
dengyihao's avatar
dengyihao 已提交
193
  }
dengyihao's avatar
dengyihao 已提交
194

dengyihao's avatar
dengyihao 已提交
195
  CONN_SHOULD_RELEASE(pConn, pHead);
dengyihao's avatar
dengyihao 已提交
196

dengyihao's avatar
dengyihao 已提交
197 198 199 200 201
  STransMsg transMsg;
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = pHead->content;
  transMsg.msgType = pHead->msgType;
  transMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
202
  transMsg.ahandle = (void*)pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
203
  transMsg.handle = NULL;
dengyihao's avatar
dengyihao 已提交
204

dengyihao's avatar
dengyihao 已提交
205
  // transDestroyBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
206
  transClearBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
207
  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
208 209 210 211
  if (pConn->status == ConnNormal) {
    if (pHead->persist == 1) {
      pConn->status = ConnAcquire;
      transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
212
      tDebug("server conn %p acquired by server app", pConn);
dengyihao's avatar
dengyihao 已提交
213 214 215
    }
  }
  if (pConn->status == ConnNormal && pHead->noResp == 0) {
dengyihao's avatar
dengyihao 已提交
216
    transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
217 218 219
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
           taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
           ntohs(pConn->locaddr.sin_port), transMsg.contLen);
dengyihao's avatar
dengyihao 已提交
220
  } else {
dengyihao's avatar
dengyihao 已提交
221 222 223
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
           TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
           taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
dengyihao's avatar
dengyihao 已提交
224 225 226
    // no ref here
  }

dengyihao's avatar
dengyihao 已提交
227 228 229 230 231
  // if pHead->noResp = 1,
  // 1. server application should not send resp on handle
  // 2. once send out data, cli conn released to conn pool immediately
  // 3. not mixed with persist
  transMsg.handle = pConn;
dengyihao's avatar
dengyihao 已提交
232

dengyihao's avatar
dengyihao 已提交
233
  STrans* pTransInst = pConn->pTransInst;
dengyihao's avatar
dengyihao 已提交
234
  (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
dengyihao's avatar
dengyihao 已提交
235
  // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
236 237
}

dengyihao's avatar
dengyihao 已提交
238
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
239
  // opt
dengyihao's avatar
dengyihao 已提交
240 241
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
242 243
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
244
    tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
245
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
246
      tTrace("server conn %p alread read complete packet", conn);
dengyihao's avatar
dengyihao 已提交
247
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
248
    } else {
dengyihao's avatar
dengyihao 已提交
249
      tTrace("server %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
250 251 252
    }
    return;
  }
253 254 255
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
256 257

  tError("server conn %p read error: %s", conn, uv_err_name(nread));
U
ubuntu 已提交
258
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
259
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
260 261
    if (conn->status == ConnAcquire) {
      if (conn->regArg.init) {
dengyihao's avatar
dengyihao 已提交
262
        tTrace("server conn %p broken, notify server app", conn);
dengyihao's avatar
dengyihao 已提交
263 264 265 266 267
        STrans* pTransInst = conn->pTransInst;
        (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
        memset(&conn->regArg, 0, sizeof(conn->regArg));
      }
    }
dengyihao's avatar
dengyihao 已提交
268
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
269 270 271 272
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->len = 2;
wafwerar's avatar
wafwerar 已提交
273
  buf->base = taosMemoryCalloc(1, sizeof(char) * buf->len);
dengyihao's avatar
dengyihao 已提交
274 275 276 277
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
278
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
279
  tError("server conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
280 281
}

dengyihao's avatar
dengyihao 已提交
282
void uvOnSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
283
  SSrvConn* conn = req->data;
dengyihao's avatar
dengyihao 已提交
284
  // transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
285
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
286
    tTrace("server conn %p data already was written on stream", conn);
dengyihao's avatar
dengyihao 已提交
287 288
    if (!transQueueEmpty(&conn->srvMsgs)) {
      SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
289 290 291 292
      if (msg->type == Release && conn->status != ConnNormal) {
        conn->status = ConnNormal;
        transUnrefSrvHandle(conn);
      }
dengyihao's avatar
add UT  
dengyihao 已提交
293 294
      destroySmsg(msg);
      // send second data, just use for push
dengyihao's avatar
dengyihao 已提交
295
      if (!transQueueEmpty(&conn->srvMsgs)) {
dengyihao's avatar
dengyihao 已提交
296
        msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
297 298 299 300 301 302 303 304 305
        if (msg->type == Register && conn->status == ConnAcquire) {
          conn->regArg.notifyCount = 0;
          conn->regArg.init = 1;
          conn->regArg.msg = msg->msg;
          if (conn->broken) {
            STrans* pTransInst = conn->pTransInst;
            (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
            memset(&conn->regArg, 0, sizeof(conn->regArg));
          }
dengyihao's avatar
dengyihao 已提交
306
          transQueuePop(&conn->srvMsgs);
wafwerar's avatar
wafwerar 已提交
307
          taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
308 309

          msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
310 311 312
          if (msg != NULL) {
            uvStartSendRespInternal(msg);
          }
dengyihao's avatar
dengyihao 已提交
313 314 315
        } else {
          uvStartSendRespInternal(msg);
        }
dengyihao's avatar
add UT  
dengyihao 已提交
316
      }
dengyihao's avatar
dengyihao 已提交
317
    }
dengyihao's avatar
dengyihao 已提交
318
  } else {
dengyihao's avatar
dengyihao 已提交
319
    tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
320
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
321
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
322 323
  }
}
dengyihao's avatar
dengyihao 已提交
324 325
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
326
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
327 328 329
  } else {
    tError("fail to dispatch conn to work thread");
  }
dengyihao's avatar
dengyihao 已提交
330 331
  uv_close((uv_handle_t*)req->data, uvFreeCb);
  // taosMemoryFree(req->data);
wafwerar's avatar
wafwerar 已提交
332
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
333
}
dengyihao's avatar
dengyihao 已提交
334

dengyihao's avatar
dengyihao 已提交
335
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
dengyihao's avatar
dengyihao 已提交
336
  tTrace("server conn %p prepare to send resp", smsg->pConn);
dengyihao's avatar
dengyihao 已提交
337

dengyihao's avatar
formate  
dengyihao 已提交
338 339
  SSrvConn*  pConn = smsg->pConn;
  STransMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
340 341 342 343 344
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
345
  pHead->ahandle = (uint64_t)pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
346

dengyihao's avatar
dengyihao 已提交
347 348 349 350 351 352 353 354 355 356 357
  // pHead->secured = pMsg->code == 0 ? 1 : 0;  //
  if (!pConn->secured) {
    pConn->secured = pMsg->code == 0 ? 1 : 0;
  }
  pHead->secured = pConn->secured;

  if (pConn->status == ConnNormal) {
    pHead->msgType = pConn->inType + 1;
  } else {
    pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType;
  }
dengyihao's avatar
dengyihao 已提交
358
  pHead->release = smsg->type == Release ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
359
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
360

dengyihao's avatar
dengyihao 已提交
361 362
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
363 364 365
  tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
         taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
         ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
366
  pHead->msgLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
367

dengyihao's avatar
dengyihao 已提交
368 369 370
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
371 372

static void uvStartSendRespInternal(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
373 374 375 376
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
377
  // uv_timer_stop(&pConn->pTimer);
dengyihao's avatar
dengyihao 已提交
378
  uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
dengyihao's avatar
dengyihao 已提交
379 380 381 382
}
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
383 384

  if (pConn->broken == true) {
dengyihao's avatar
dengyihao 已提交
385
    // persist by
dengyihao's avatar
dengyihao 已提交
386 387 388
    transUnrefSrvHandle(pConn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
389 390 391
  if (pConn->status == ConnNormal) {
    transUnrefSrvHandle(pConn);
  }
dengyihao's avatar
dengyihao 已提交
392

dengyihao's avatar
dengyihao 已提交
393
  if (!transQueuePush(&pConn->srvMsgs, smsg)) {
dengyihao's avatar
dengyihao 已提交
394 395 396
    return;
  }
  uvStartSendRespInternal(smsg);
dengyihao's avatar
dengyihao 已提交
397 398
  return;
}
dengyihao's avatar
dengyihao 已提交
399

dengyihao's avatar
dengyihao 已提交
400
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
401 402 403 404
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
wafwerar's avatar
wafwerar 已提交
405
  taosMemoryFree(smsg);
dengyihao's avatar
dengyihao 已提交
406
}
dengyihao's avatar
fix bug  
dengyihao 已提交
407 408 409 410 411 412 413
static void destroyAllConn(SWorkThrdObj* pThrd) {
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
    queue* h = QUEUE_HEAD(&pThrd->conn);
    QUEUE_REMOVE(h);
    QUEUE_INIT(h);

    SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
dengyihao's avatar
dengyihao 已提交
414 415 416
    while (T_REF_VAL_GET(c) >= 2) {
      transUnrefSrvHandle(c);
    }
dengyihao's avatar
dengyihao 已提交
417
    transUnrefSrvHandle(c);
dengyihao's avatar
fix bug  
dengyihao 已提交
418 419
  }
}
dengyihao's avatar
dengyihao 已提交
420
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
421 422
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
423
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
424
  queue         wq;
dengyihao's avatar
dengyihao 已提交
425

dengyihao's avatar
dengyihao 已提交
426
  // batch process to avoid to lock/unlock frequently
wafwerar's avatar
wafwerar 已提交
427
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
428
  QUEUE_MOVE(&item->qmsg, &wq);
wafwerar's avatar
wafwerar 已提交
429
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
430 431 432 433

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
434 435 436

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
437
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
438
      continue;
dengyihao's avatar
dengyihao 已提交
439
    }
dengyihao's avatar
dengyihao 已提交
440
    (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
441 442
  }
}
dengyihao's avatar
dengyihao 已提交
443 444 445 446 447
static void uvWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}
dengyihao's avatar
dengyihao 已提交
448 449 450 451
static void uvFreeCb(uv_handle_t* handle) {
  //
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
452

dengyihao's avatar
dengyihao 已提交
453 454
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
dengyihao's avatar
dengyihao 已提交
455
  tDebug("close server port %d", srv->port);
dengyihao's avatar
dengyihao 已提交
456
  uv_walk(srv->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
457
}
dengyihao's avatar
dengyihao 已提交
458

dengyihao's avatar
dengyihao 已提交
459
static void uvShutDownCb(uv_shutdown_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
460 461 462
  if (status != 0) {
    tDebug("conn failed to shut down: %s", uv_err_name(status));
  }
dengyihao's avatar
dengyihao 已提交
463
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
wafwerar's avatar
wafwerar 已提交
464
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
465 466
}

dengyihao's avatar
dengyihao 已提交
467 468 469 470 471 472
void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

wafwerar's avatar
wafwerar 已提交
473
  uv_tcp_t* cli = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
474 475 476
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
wafwerar's avatar
wafwerar 已提交
477
    uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
478
    wr->data = cli;
dengyihao's avatar
dengyihao 已提交
479 480 481
    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
482

dengyihao's avatar
dengyihao 已提交
483
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
484

dengyihao's avatar
dengyihao 已提交
485
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
486 487 488 489 490
  } else {
    uv_close((uv_handle_t*)cli, NULL);
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
491
  tTrace("server connection coming");
dengyihao's avatar
dengyihao 已提交
492 493 494 495 496
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
dengyihao's avatar
dengyihao 已提交
497 498 499 500
    tError("failed to create connect: %p", q);
    taosMemoryFree(buf->base);
    uv_close((uv_handle_t*)q, NULL);
    // taosMemoryFree(q);
dengyihao's avatar
dengyihao 已提交
501 502 503 504 505
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
wafwerar's avatar
wafwerar 已提交
506
  taosMemoryFree(buf->base);
dengyihao's avatar
dengyihao 已提交
507 508 509 510 511 512 513 514 515 516 517 518

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
fix bug  
dengyihao 已提交
519
  SSrvConn* pConn = createConn(pThrd);
520

dengyihao's avatar
dengyihao 已提交
521
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
522
  /* init conn timer*/
dengyihao's avatar
dengyihao 已提交
523 524
  // uv_timer_init(pThrd->loop, &pConn->pTimer);
  // pConn->pTimer.data = pConn;
dengyihao's avatar
dengyihao 已提交
525 526 527 528

  pConn->hostThrd = pThrd;

  // init client handle
wafwerar's avatar
wafwerar 已提交
529
  pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
530 531 532
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

dengyihao's avatar
dengyihao 已提交
533
  pConn->pWriter.data = pConn;
dengyihao's avatar
dengyihao 已提交
534

dengyihao's avatar
dengyihao 已提交
535 536
  transSetConnOption((uv_tcp_t*)pConn->pTcp);

dengyihao's avatar
dengyihao 已提交
537 538 539
  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
540
    tTrace("server conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
541

dengyihao's avatar
dengyihao 已提交
542 543
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
544
      tError("server conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
545
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
546
      return;
dengyihao's avatar
dengyihao 已提交
547
    }
dengyihao's avatar
dengyihao 已提交
548 549 550 551

    addrlen = sizeof(pConn->locaddr);
    if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
      tError("server conn %p failed to get local info", pConn);
dengyihao's avatar
dengyihao 已提交
552
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
553 554 555
      return;
    }

dengyihao's avatar
dengyihao 已提交
556
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
dengyihao's avatar
dengyihao 已提交
557

dengyihao's avatar
dengyihao 已提交
558
  } else {
dengyihao's avatar
dengyihao 已提交
559
    tDebug("failed to create new connection");
dengyihao's avatar
dengyihao 已提交
560
    transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
561 562 563
  }
}

564
void* transAcceptThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
565
  // opt
dengyihao's avatar
dengyihao 已提交
566
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
567 568
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
569 570

  return NULL;
dengyihao's avatar
dengyihao 已提交
571
}
dengyihao's avatar
dengyihao 已提交
572 573
static bool addHandleToWorkloop(void* arg) {
  SWorkThrdObj* pThrd = arg;
wafwerar's avatar
wafwerar 已提交
574
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
575 576 577
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
578

dengyihao's avatar
dengyihao 已提交
579
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
dengyihao's avatar
dengyihao 已提交
580 581 582 583
  uv_pipe_open(pThrd->pipe, pThrd->fd);

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
584
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
585
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
586

dengyihao's avatar
fix bug  
dengyihao 已提交
587 588 589
  // conn set
  QUEUE_INIT(&pThrd->conn);

dengyihao's avatar
dengyihao 已提交
590
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
dengyihao's avatar
dengyihao 已提交
591
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
592 593 594 595 596 597 598 599 600 601 602 603 604
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
605
  // register an async here to quit server gracefully
wafwerar's avatar
wafwerar 已提交
606
  srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t));
dengyihao's avatar
dengyihao 已提交
607 608
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
609

dengyihao's avatar
dengyihao 已提交
610
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
611 612 613 614 615
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
616
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
617 618 619 620
    tError("failed to listen: %s", uv_err_name(err));
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
621
}
622
void* transWorkerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
623
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
624
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
625
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
626 627

  return NULL;
dengyihao's avatar
dengyihao 已提交
628 629
}

dengyihao's avatar
fix bug  
dengyihao 已提交
630 631 632
static SSrvConn* createConn(void* hThrd) {
  SWorkThrdObj* pThrd = hThrd;

wafwerar's avatar
wafwerar 已提交
633
  SSrvConn* pConn = (SSrvConn*)taosMemoryCalloc(1, sizeof(SSrvConn));
dengyihao's avatar
fix bug  
dengyihao 已提交
634 635 636
  QUEUE_INIT(&pConn->queue);

  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
dengyihao's avatar
dengyihao 已提交
637 638

  transQueueInit(&pConn->srvMsgs, NULL);
dengyihao's avatar
dengyihao 已提交
639

dengyihao's avatar
dengyihao 已提交
640
  memset(&pConn->regArg, 0, sizeof(pConn->regArg));
dengyihao's avatar
dengyihao 已提交
641
  pConn->broken = false;
dengyihao's avatar
dengyihao 已提交
642
  pConn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
643

dengyihao's avatar
dengyihao 已提交
644
  transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
645
  tTrace("server conn %p created", pConn);
dengyihao's avatar
dengyihao 已提交
646 647
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
648

dengyihao's avatar
dengyihao 已提交
649
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
650 651 652
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
653
  transDestroyBuffer(&conn->readBuf);
654
  if (clear) {
dengyihao's avatar
dengyihao 已提交
655
    tTrace("server conn %p to be destroyed", conn);
wafwerar's avatar
wafwerar 已提交
656
    uv_shutdown_t* req = taosMemoryMalloc(sizeof(uv_shutdown_t));
dengyihao's avatar
dengyihao 已提交
657
    uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
658
  }
dengyihao's avatar
dengyihao 已提交
659 660
}
static void uvDestroyConn(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
661 662 663 664
  SSrvConn* conn = handle->data;
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
665 666
  SWorkThrdObj* thrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
667
  tDebug("server conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
668
  // uv_timer_stop(&conn->pTimer);
dengyihao's avatar
dengyihao 已提交
669
  transQueueDestroy(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
670
  QUEUE_REMOVE(&conn->queue);
wafwerar's avatar
wafwerar 已提交
671
  taosMemoryFree(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
672
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
673

dengyihao's avatar
dengyihao 已提交
674
  if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
675
    tTrace("work thread quit");
dengyihao's avatar
dengyihao 已提交
676 677 678
    uv_walk(thrd->loop, uvWalkCb, NULL);
    // uv_loop_close(thrd->loop);
    // uv_stop(thrd->loop);
dengyihao's avatar
dengyihao 已提交
679
  }
dengyihao's avatar
dengyihao 已提交
680 681
}

U
ubuntu 已提交
682
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
683 684
  SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
685 686
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
wafwerar's avatar
wafwerar 已提交
687 688
  srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
dengyihao's avatar
dengyihao 已提交
689 690 691 692 693
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
694
    SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
695
    thrd->quit = false;
dengyihao's avatar
dengyihao 已提交
696
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
697

wafwerar's avatar
wafwerar 已提交
698
    srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
dengyihao's avatar
dengyihao 已提交
699 700
    int fds[2];
    if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
dengyihao's avatar
dengyihao 已提交
701
      goto End;
dengyihao's avatar
dengyihao 已提交
702 703 704 705
    }
    uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

dengyihao's avatar
dengyihao 已提交
706
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
707 708
    thrd->fd = fds[0];
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
709

dengyihao's avatar
dengyihao 已提交
710 711 712
    if (false == addHandleToWorkloop(thrd)) {
      goto End;
    }
713
    int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
dengyihao's avatar
dengyihao 已提交
714 715 716 717 718 719 720 721
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
    }
  }
dengyihao's avatar
dengyihao 已提交
722 723 724
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
725
  int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
dengyihao's avatar
dengyihao 已提交
726 727 728 729 730 731 732
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
    // clear all resource later
  }

  return srv;
dengyihao's avatar
dengyihao 已提交
733
End:
U
ubuntu 已提交
734
  transCloseServer(srv);
dengyihao's avatar
dengyihao 已提交
735 736
  return NULL;
}
dengyihao's avatar
dengyihao 已提交
737
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
738
  thrd->quit = true;
dengyihao's avatar
dengyihao 已提交
739
  if (QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
740 741 742
    uv_walk(thrd->loop, uvWalkCb, NULL);
    // uv_loop_close(thrd->loop);
    // uv_stop(thrd->loop);
dengyihao's avatar
dengyihao 已提交
743 744 745
  } else {
    destroyAllConn(thrd);
  }
wafwerar's avatar
wafwerar 已提交
746
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
747 748 749 750 751
}
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
  // release handle to rpc init
  SSrvConn* conn = msg->pConn;
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
752
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
753
      return;
dengyihao's avatar
dengyihao 已提交
754 755 756
    }
    uvStartSendRespInternal(msg);
    return;
dengyihao's avatar
dengyihao 已提交
757 758
  } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
    tDebug("server conn %p already released, ignore release-msg", conn);
dengyihao's avatar
dengyihao 已提交
759
  }
dengyihao's avatar
dengyihao 已提交
760
  destroySmsg(msg);
dengyihao's avatar
dengyihao 已提交
761
}
dengyihao's avatar
dengyihao 已提交
762
void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
763
  // send msg to client
dengyihao's avatar
dengyihao 已提交
764
  tDebug("server conn %p start to send resp (2/2)", msg->pConn);
dengyihao's avatar
dengyihao 已提交
765 766
  uvStartSendResp(msg);
}
dengyihao's avatar
dengyihao 已提交
767 768
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
dengyihao's avatar
dengyihao 已提交
769
  tDebug("server conn %p register brokenlink callback", conn);
dengyihao's avatar
dengyihao 已提交
770
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
771
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
772 773
      return;
    }
dengyihao's avatar
dengyihao 已提交
774
    transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
775 776 777
    conn->regArg.notifyCount = 0;
    conn->regArg.init = 1;
    conn->regArg.msg = msg->msg;
dengyihao's avatar
dengyihao 已提交
778
    tDebug("server conn %p register brokenlink callback succ", conn);
dengyihao's avatar
dengyihao 已提交
779 780 781 782 783 784

    if (conn->broken) {
      STrans* pTransInst = conn->pTransInst;
      (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
      memset(&conn->regArg, 0, sizeof(conn->regArg));
    }
wafwerar's avatar
wafwerar 已提交
785
    taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
786 787
  }
}
dengyihao's avatar
dengyihao 已提交
788 789 790 791
void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
792
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
793
  SRV_RELEASE_UV(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
794
  transDestroyAsyncPool(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
795
  taosMemoryFree(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
796
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
797
}
dengyihao's avatar
dengyihao 已提交
798
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
wafwerar's avatar
wafwerar 已提交
799
  SSrvMsg* msg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
800
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
801
  tDebug("server send quit msg to work thread");
dengyihao's avatar
dengyihao 已提交
802
  transSendAsync(pThrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
803 804
}

U
ubuntu 已提交
805
void transCloseServer(void* arg) {
dengyihao's avatar
dengyihao 已提交
806 807
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
808 809 810

  tDebug("send quit msg to accept thread");
  uv_async_send(srv->pAcceptAsync);
wafwerar's avatar
wafwerar 已提交
811
  taosThreadJoin(srv->thread, NULL);
dengyihao's avatar
dengyihao 已提交
812

dengyihao's avatar
dengyihao 已提交
813
  SRV_RELEASE_UV(srv->loop);
dengyihao's avatar
dengyihao 已提交
814

dengyihao's avatar
dengyihao 已提交
815 816 817 818 819
  for (int i = 0; i < srv->numOfThreads; i++) {
    sendQuitToWorkThrd(srv->pThreadObj[i]);
    destroyWorkThrd(srv->pThreadObj[i]);
  }

wafwerar's avatar
wafwerar 已提交
820 821 822
  taosMemoryFree(srv->pThreadObj);
  taosMemoryFree(srv->pAcceptAsync);
  taosMemoryFree(srv->loop);
dengyihao's avatar
dengyihao 已提交
823 824

  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
825
    taosMemoryFree(srv->pipe[i]);
dengyihao's avatar
dengyihao 已提交
826
  }
wafwerar's avatar
wafwerar 已提交
827
  taosMemoryFree(srv->pipe);
dengyihao's avatar
dengyihao 已提交
828

wafwerar's avatar
wafwerar 已提交
829
  taosMemoryFree(srv);
dengyihao's avatar
dengyihao 已提交
830
}
dengyihao's avatar
dengyihao 已提交
831

dengyihao's avatar
dengyihao 已提交
832 833 834 835 836 837 838 839 840 841 842 843 844 845 846
void transRefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  SSrvConn* conn = handle;

  int ref = T_REF_INC((SSrvConn*)handle);
  UNUSED(ref);
}

void transUnrefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
847
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
848 849 850 851
  if (ref == 0) {
    destroyConn((SSrvConn*)handle, true);
  }
}
dengyihao's avatar
dengyihao 已提交
852 853

void transReleaseSrvHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
854 855 856 857 858 859
  if (handle == NULL) {
    return;
  }
  SSrvConn*     pConn = handle;
  SWorkThrdObj* pThrd = pConn->hostThrd;

dengyihao's avatar
dengyihao 已提交
860
  STransMsg tmsg = {.code = 0, .handle = handle, .ahandle = NULL};
dengyihao's avatar
dengyihao 已提交
861

wafwerar's avatar
wafwerar 已提交
862
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
863 864 865 866 867 868
  srvMsg->msg = tmsg;
  srvMsg->type = Release;
  srvMsg->pConn = pConn;

  tTrace("server conn %p start to release", pConn);
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
869
}
U
ubuntu 已提交
870
void transSendResponse(const STransMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
871 872 873
  if (pMsg->handle == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
874
  SSrvConn*     pConn = pMsg->handle;
dengyihao's avatar
dengyihao 已提交
875
  SWorkThrdObj* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
876 877 878
  if (pThrd->quit) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
879

wafwerar's avatar
wafwerar 已提交
880
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
881 882
  srvMsg->pConn = pConn;
  srvMsg->msg = *pMsg;
dengyihao's avatar
dengyihao 已提交
883
  srvMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
884
  tTrace("server conn %p start to send resp (1/2)", pConn);
dengyihao's avatar
dengyihao 已提交
885
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
886
}
dengyihao's avatar
dengyihao 已提交
887 888 889 890 891 892 893
void transRegisterMsg(const STransMsg* msg) {
  if (msg->handle == NULL) {
    return;
  }
  SSrvConn*     pConn = msg->handle;
  SWorkThrdObj* pThrd = pConn->hostThrd;

wafwerar's avatar
wafwerar 已提交
894
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
895 896 897
  srvMsg->pConn = pConn;
  srvMsg->msg = *msg;
  srvMsg->type = Register;
dengyihao's avatar
dengyihao 已提交
898
  tTrace("server conn %p start to register brokenlink callback", pConn);
dengyihao's avatar
dengyihao 已提交
899 900
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
}
dengyihao's avatar
formate  
dengyihao 已提交
901 902
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
  SSrvConn*          pConn = thandle;
dengyihao's avatar
dengyihao 已提交
903
  struct sockaddr_in addr = pConn->addr;
U
ubuntu 已提交
904

dengyihao's avatar
dengyihao 已提交
905 906
  pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(addr.sin_port);
dengyihao's avatar
dengyihao 已提交
907 908 909 910
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
911
#endif