transSrv.c 31.2 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20 21 22 23 24 25
typedef struct {
  int       notifyCount;  //
  int       init;         // init or not
  STransMsg msg;
} SSrvRegArg;

dengyihao's avatar
dengyihao 已提交
26
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
27
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
28 29 30
  uv_tcp_t*  pTcp;
  uv_write_t pWriter;
  uv_timer_t pTimer;
dengyihao's avatar
dengyihao 已提交
31 32

  queue       queue;
dengyihao's avatar
dengyihao 已提交
33
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
34
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
35 36 37
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
38
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
39
  STransQueue srvMsgs;
dengyihao's avatar
dengyihao 已提交
40

dengyihao's avatar
dengyihao 已提交
41 42
  SSrvRegArg regArg;
  bool       broken;  // conn broken;
dengyihao's avatar
dengyihao 已提交
43

dengyihao's avatar
dengyihao 已提交
44
  ConnStatus         status;
dengyihao's avatar
dengyihao 已提交
45
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
46
  struct sockaddr_in locaddr;
U
ubuntu 已提交
47

dengyihao's avatar
dengyihao 已提交
48 49 50 51 52 53
  int64_t refId;
  int     spi;
  char    info[64];
  char    user[TSDB_UNI_LEN];  // user ID for the link
  char    secret[TSDB_PASSWORD_LEN];
  char    ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
54 55 56
} SSrvConn;

typedef struct SSrvMsg {
dengyihao's avatar
dengyihao 已提交
57 58 59 60
  SSrvConn*     pConn;
  STransMsg     msg;
  queue         q;
  STransMsgType type;
dengyihao's avatar
dengyihao 已提交
61
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
62 63

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
64 65 66 67 68 69
  TdThread      thread;
  uv_pipe_t*    pipe;
  uv_os_fd_t    fd;
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  queue         msg;
wafwerar's avatar
wafwerar 已提交
70
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
71 72 73 74

  queue conn;
  void* pTransInst;
  bool  quit;
dengyihao's avatar
dengyihao 已提交
75 76 77
} SWorkThrdObj;

typedef struct SServerObj {
dengyihao's avatar
dengyihao 已提交
78
  TdThread   thread;
dengyihao's avatar
dengyihao 已提交
79 80 81 82
  uv_tcp_t   server;
  uv_loop_t* loop;

  // work thread info
dengyihao's avatar
dengyihao 已提交
83 84 85
  int            workerIdx;
  int            numOfThreads;
  SWorkThrdObj** pThreadObj;
dengyihao's avatar
dengyihao 已提交
86 87 88 89 90

  uv_pipe_t** pipe;
  uint32_t    ip;
  uint32_t    port;
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
91 92
} SServerObj;

dengyihao's avatar
dengyihao 已提交
93 94 95 96 97 98 99
// handle
typedef struct SExHandle {
  void*         handle;
  int64_t       refId;
  SWorkThrdObj* pThrd;
} SExHandle;

dengyihao's avatar
dengyihao 已提交
100 101 102
static const char* notify = "a";

static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
103
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
104
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
105
static void uvOnTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
106
static void uvOnSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
107
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
108 109 110
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
111
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
112
static void uvShutDownCb(uv_shutdown_t* req, int status);
dengyihao's avatar
dengyihao 已提交
113 114 115 116 117 118 119

/*
 * time-consuming task throwed into BG work thread
 */
static void uvWorkDoTask(uv_work_t* req);
static void uvWorkAfterTask(uv_work_t* req, int status);

dengyihao's avatar
dengyihao 已提交
120 121
static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
122

dengyihao's avatar
dengyihao 已提交
123
static void uvStartSendRespInternal(SSrvMsg* smsg);
dengyihao's avatar
dengyihao 已提交
124 125
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
126

dengyihao's avatar
dengyihao 已提交
127 128
static void uvNotifyLinkBrokenToApp(SSrvConn* conn);

dengyihao's avatar
dengyihao 已提交
129
static void destroySmsg(SSrvMsg* smsg);
130
// check whether already read complete packet
dengyihao's avatar
fix bug  
dengyihao 已提交
131
static SSrvConn* createConn(void* hThrd);
dengyihao's avatar
dengyihao 已提交
132
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
133

dengyihao's avatar
dengyihao 已提交
134 135
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
136
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
137
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
138 139
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
                                                                       uvHandleRegister};
dengyihao's avatar
dengyihao 已提交
140

dengyihao's avatar
dengyihao 已提交
141 142 143 144 145 146 147 148 149
static int exHandlesMgt;

void       uvOpenExHandleMgt(int size);
int64_t    uvAddExHandle(void* p);
int32_t    uvRemoveExHandle(int64_t refId);
int32_t    uvReleaseExHandle(int64_t refId);
void       uvDestoryExHandle(void* handle);
SExHandle* uvAcquireExHandle(int64_t refId);

150
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
151

dengyihao's avatar
dengyihao 已提交
152
// server and worker thread
153 154
static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
dengyihao's avatar
dengyihao 已提交
155

dengyihao's avatar
dengyihao 已提交
156 157 158 159
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
#define CONN_SHOULD_RELEASE(conn, head)                                     \
  do {                                                                      \
    if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) {          \
      conn->status = ConnRelease;                                           \
      transClearBuffer(&conn->readBuf);                                     \
      transFreeMsg(transContFromHead((char*)head));                         \
      tTrace("server conn %p received release request", conn);              \
                                                                            \
      STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
      SSrvMsg*  srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));              \
      srvMsg->msg = tmsg;                                                   \
      srvMsg->type = Release;                                               \
      srvMsg->pConn = conn;                                                 \
      if (!transQueuePush(&conn->srvMsgs, srvMsg)) {                        \
        return;                                                             \
      }                                                                     \
      uvStartSendRespInternal(srvMsg);                                      \
      return;                                                               \
    }                                                                       \
  } while (0)

#define SRV_RELEASE_UV(loop)       \
  do {                             \
    uv_walk(loop, uvWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);  \
    uv_loop_close(loop);           \
  } while (0);

dengyihao's avatar
dengyihao 已提交
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
#define ASYNC_ERR_JRET(thrd)                            \
  do {                                                  \
    if (thrd->quit) {                                   \
      tTrace("worker thread already quit, ignore msg"); \
      goto _return;                                     \
    }                                                   \
  } while (0)

#define ASYNC_CHECK_HANDLE(exh1, refId)                                                             \
  do {                                                                                              \
    if (refId != -1) {                                                                              \
      SExHandle* exh2 = uvAcquireExHandle(refId);                                                   \
      if (exh2 == NULL || exh1 != exh2) {                                                           \
        tTrace("server conn %p except, may already freed, ignore msg", exh2 ? exh2->handle : NULL); \
        goto _return;                                                                               \
      }                                                                                             \
    }                                                                                               \
  } while (0)

dengyihao's avatar
dengyihao 已提交
207
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
208 209 210
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
211 212 213 214
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
215
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
216
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
217 218
}

dengyihao's avatar
dengyihao 已提交
219 220
static void uvHandleReq(SSrvConn* pConn) {
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
221 222 223 224
  char*        msg = pBuf->buf;
  uint32_t     msgLen = pBuf->len;

  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
225
  pHead->code = htonl(pHead->code);
dengyihao's avatar
dengyihao 已提交
226
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
227
  memcpy(pConn->user, pHead->user, strlen(pHead->user));
dengyihao's avatar
dengyihao 已提交
228

dengyihao's avatar
dengyihao 已提交
229 230 231 232 233 234 235
  // TODO(dengyihao): time-consuming task throwed into BG Thread
  //  uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
  //  wreq->data = pConn;
  //  uv_read_stop((uv_stream_t*)pConn->pTcp);
  //  transRefSrvHandle(pConn);
  //  uv_queue_work(((SWorkThrdObj*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);

dengyihao's avatar
dengyihao 已提交
236
  CONN_SHOULD_RELEASE(pConn, pHead);
dengyihao's avatar
dengyihao 已提交
237

dengyihao's avatar
dengyihao 已提交
238 239 240 241 242
  STransMsg transMsg;
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = pHead->content;
  transMsg.msgType = pHead->msgType;
  transMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
243
  transMsg.ahandle = (void*)pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
244
  transMsg.handle = NULL;
dengyihao's avatar
dengyihao 已提交
245

dengyihao's avatar
dengyihao 已提交
246
  // transDestroyBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
247
  transClearBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
248
  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
249 250 251 252
  if (pConn->status == ConnNormal) {
    if (pHead->persist == 1) {
      pConn->status = ConnAcquire;
      transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
253
      tDebug("server conn %p acquired by server app", pConn);
dengyihao's avatar
dengyihao 已提交
254 255 256
    }
  }
  if (pConn->status == ConnNormal && pHead->noResp == 0) {
dengyihao's avatar
dengyihao 已提交
257
    transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
258 259 260
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
           taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
           ntohs(pConn->locaddr.sin_port), transMsg.contLen);
dengyihao's avatar
dengyihao 已提交
261
  } else {
dengyihao's avatar
dengyihao 已提交
262 263 264
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
           TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
           taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
dengyihao's avatar
dengyihao 已提交
265 266 267
    // no ref here
  }

dengyihao's avatar
dengyihao 已提交
268 269 270 271
  // if pHead->noResp = 1,
  // 1. server application should not send resp on handle
  // 2. once send out data, cli conn released to conn pool immediately
  // 3. not mixed with persist
dengyihao's avatar
dengyihao 已提交
272 273 274 275 276
  transMsg.handle = (void*)uvAcquireExHandle(pConn->refId);
  if (pHead->noResp == 1) {
    // transMsg.refId = -1;
  }
  uvReleaseExHandle(pConn->refId);
dengyihao's avatar
dengyihao 已提交
277

dengyihao's avatar
dengyihao 已提交
278
  STrans* pTransInst = pConn->pTransInst;
dengyihao's avatar
dengyihao 已提交
279
  (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
dengyihao's avatar
dengyihao 已提交
280
  // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
281 282
}

dengyihao's avatar
dengyihao 已提交
283
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
284
  // opt
dengyihao's avatar
dengyihao 已提交
285 286
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
287 288
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
289
    tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
290
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
291
      tTrace("server conn %p alread read complete packet", conn);
dengyihao's avatar
dengyihao 已提交
292
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
293
    } else {
dengyihao's avatar
dengyihao 已提交
294
      tTrace("server %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
295 296 297
    }
    return;
  }
298 299 300
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
301 302

  tError("server conn %p read error: %s", conn, uv_err_name(nread));
U
ubuntu 已提交
303
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
304
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
305 306
    if (conn->status == ConnAcquire) {
      if (conn->regArg.init) {
dengyihao's avatar
dengyihao 已提交
307
        tTrace("server conn %p broken, notify server app", conn);
dengyihao's avatar
dengyihao 已提交
308 309 310 311 312
        STrans* pTransInst = conn->pTransInst;
        (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
        memset(&conn->regArg, 0, sizeof(conn->regArg));
      }
    }
dengyihao's avatar
dengyihao 已提交
313
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
314 315 316 317
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->len = 2;
wafwerar's avatar
wafwerar 已提交
318
  buf->base = taosMemoryCalloc(1, sizeof(char) * buf->len);
dengyihao's avatar
dengyihao 已提交
319 320 321 322
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
323
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
324
  tError("server conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
325 326
}

dengyihao's avatar
dengyihao 已提交
327
void uvOnSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
328
  SSrvConn* conn = req->data;
dengyihao's avatar
dengyihao 已提交
329
  // transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
330
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
331
    tTrace("server conn %p data already was written on stream", conn);
dengyihao's avatar
dengyihao 已提交
332 333
    if (!transQueueEmpty(&conn->srvMsgs)) {
      SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
334 335 336 337
      if (msg->type == Release && conn->status != ConnNormal) {
        conn->status = ConnNormal;
        transUnrefSrvHandle(conn);
      }
dengyihao's avatar
add UT  
dengyihao 已提交
338 339
      destroySmsg(msg);
      // send second data, just use for push
dengyihao's avatar
dengyihao 已提交
340
      if (!transQueueEmpty(&conn->srvMsgs)) {
dengyihao's avatar
dengyihao 已提交
341
        msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
342 343 344 345 346 347 348 349 350
        if (msg->type == Register && conn->status == ConnAcquire) {
          conn->regArg.notifyCount = 0;
          conn->regArg.init = 1;
          conn->regArg.msg = msg->msg;
          if (conn->broken) {
            STrans* pTransInst = conn->pTransInst;
            (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
            memset(&conn->regArg, 0, sizeof(conn->regArg));
          }
dengyihao's avatar
dengyihao 已提交
351
          transQueuePop(&conn->srvMsgs);
wafwerar's avatar
wafwerar 已提交
352
          taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
353 354

          msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
355 356 357
          if (msg != NULL) {
            uvStartSendRespInternal(msg);
          }
dengyihao's avatar
dengyihao 已提交
358 359 360
        } else {
          uvStartSendRespInternal(msg);
        }
dengyihao's avatar
add UT  
dengyihao 已提交
361
      }
dengyihao's avatar
dengyihao 已提交
362
    }
dengyihao's avatar
dengyihao 已提交
363
  } else {
dengyihao's avatar
dengyihao 已提交
364
    tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
365
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
366
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
367 368
  }
}
dengyihao's avatar
dengyihao 已提交
369 370
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
371
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
372 373 374
  } else {
    tError("fail to dispatch conn to work thread");
  }
dengyihao's avatar
dengyihao 已提交
375 376
  uv_close((uv_handle_t*)req->data, uvFreeCb);
  // taosMemoryFree(req->data);
wafwerar's avatar
wafwerar 已提交
377
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
378
}
dengyihao's avatar
dengyihao 已提交
379

dengyihao's avatar
dengyihao 已提交
380
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
dengyihao's avatar
dengyihao 已提交
381
  tTrace("server conn %p prepare to send resp", smsg->pConn);
dengyihao's avatar
dengyihao 已提交
382

dengyihao's avatar
formate  
dengyihao 已提交
383 384
  SSrvConn*  pConn = smsg->pConn;
  STransMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
385 386 387 388 389
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
390
  pHead->ahandle = (uint64_t)pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
391

dengyihao's avatar
dengyihao 已提交
392 393 394 395 396
  if (pConn->status == ConnNormal) {
    pHead->msgType = pConn->inType + 1;
  } else {
    pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType;
  }
dengyihao's avatar
dengyihao 已提交
397
  pHead->release = smsg->type == Release ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
398
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
399

dengyihao's avatar
dengyihao 已提交
400 401
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
402 403 404
  tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
         taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
         ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
405
  pHead->msgLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
406

dengyihao's avatar
dengyihao 已提交
407 408 409
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
410 411

static void uvStartSendRespInternal(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
412 413 414 415
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
416
  // uv_timer_stop(&pConn->pTimer);
dengyihao's avatar
dengyihao 已提交
417
  uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
dengyihao's avatar
dengyihao 已提交
418 419 420 421
}
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
422 423

  if (pConn->broken == true) {
dengyihao's avatar
dengyihao 已提交
424
    // persist by
dengyihao's avatar
dengyihao 已提交
425 426 427
    transUnrefSrvHandle(pConn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
428 429 430
  if (pConn->status == ConnNormal) {
    transUnrefSrvHandle(pConn);
  }
dengyihao's avatar
dengyihao 已提交
431

dengyihao's avatar
dengyihao 已提交
432
  if (!transQueuePush(&pConn->srvMsgs, smsg)) {
dengyihao's avatar
dengyihao 已提交
433 434 435
    return;
  }
  uvStartSendRespInternal(smsg);
dengyihao's avatar
dengyihao 已提交
436 437
  return;
}
dengyihao's avatar
dengyihao 已提交
438

dengyihao's avatar
dengyihao 已提交
439
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
440 441 442 443
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
wafwerar's avatar
wafwerar 已提交
444
  taosMemoryFree(smsg);
dengyihao's avatar
dengyihao 已提交
445
}
dengyihao's avatar
fix bug  
dengyihao 已提交
446 447 448 449 450 451 452
static void destroyAllConn(SWorkThrdObj* pThrd) {
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
    queue* h = QUEUE_HEAD(&pThrd->conn);
    QUEUE_REMOVE(h);
    QUEUE_INIT(h);

    SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
dengyihao's avatar
dengyihao 已提交
453 454 455
    while (T_REF_VAL_GET(c) >= 2) {
      transUnrefSrvHandle(c);
    }
dengyihao's avatar
dengyihao 已提交
456
    transUnrefSrvHandle(c);
dengyihao's avatar
fix bug  
dengyihao 已提交
457 458
  }
}
dengyihao's avatar
dengyihao 已提交
459
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
460 461
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
462
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
463
  queue         wq;
dengyihao's avatar
dengyihao 已提交
464

dengyihao's avatar
dengyihao 已提交
465
  // batch process to avoid to lock/unlock frequently
wafwerar's avatar
wafwerar 已提交
466
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
467
  QUEUE_MOVE(&item->qmsg, &wq);
wafwerar's avatar
wafwerar 已提交
468
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
469 470 471 472

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
473 474 475

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
476
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
477
      continue;
dengyihao's avatar
dengyihao 已提交
478
    }
dengyihao's avatar
dengyihao 已提交
479 480 481 482 483 484 485 486 487 488 489 490
    // release handle to rpc init
    STransMsg  transMsg = msg->msg;
    SExHandle* exh1 = transMsg.handle;
    int64_t    refId = transMsg.refId;
    SExHandle* exh2 = uvAcquireExHandle(refId);
    if (exh2 == NULL || exh1 != exh2) {
      uvReleaseExHandle(refId);
      destroySmsg(msg);
      continue;
    }

    msg->pConn = exh1->handle;
dengyihao's avatar
dengyihao 已提交
491
    (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
492 493
  }
}
dengyihao's avatar
dengyihao 已提交
494 495 496 497 498
static void uvWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}
dengyihao's avatar
dengyihao 已提交
499 500 501 502
static void uvFreeCb(uv_handle_t* handle) {
  //
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
503

dengyihao's avatar
dengyihao 已提交
504 505
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
dengyihao's avatar
dengyihao 已提交
506
  tDebug("close server port %d", srv->port);
dengyihao's avatar
dengyihao 已提交
507
  uv_walk(srv->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
508
}
dengyihao's avatar
dengyihao 已提交
509

dengyihao's avatar
dengyihao 已提交
510
static void uvShutDownCb(uv_shutdown_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
511 512 513
  if (status != 0) {
    tDebug("conn failed to shut down: %s", uv_err_name(status));
  }
dengyihao's avatar
dengyihao 已提交
514
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
wafwerar's avatar
wafwerar 已提交
515
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
516 517
}

dengyihao's avatar
dengyihao 已提交
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
static void uvWorkDoTask(uv_work_t* req) {
  // doing time-consumeing task
  // only auth conn currently, add more func later
  tTrace("server conn %p start to be processed in BG Thread", req->data);
  return;
}

static void uvWorkAfterTask(uv_work_t* req, int status) {
  if (status != 0) {
    tTrace("server conn %p failed to processed ", req->data);
  }
  // Done time-consumeing task
  // add more func later
  // this func called in main loop
  tTrace("server conn %p already processed ", req->data);
  taosMemoryFree(req);
}

dengyihao's avatar
dengyihao 已提交
536 537 538 539 540 541
void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

wafwerar's avatar
wafwerar 已提交
542
  uv_tcp_t* cli = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
543 544 545
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
wafwerar's avatar
wafwerar 已提交
546
    uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
547
    wr->data = cli;
dengyihao's avatar
dengyihao 已提交
548 549 550
    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
551

dengyihao's avatar
dengyihao 已提交
552
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
553

dengyihao's avatar
dengyihao 已提交
554
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
555 556 557 558 559
  } else {
    uv_close((uv_handle_t*)cli, NULL);
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
560
  tTrace("server connection coming");
dengyihao's avatar
dengyihao 已提交
561 562 563 564 565
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
dengyihao's avatar
dengyihao 已提交
566 567 568 569
    tError("failed to create connect: %p", q);
    taosMemoryFree(buf->base);
    uv_close((uv_handle_t*)q, NULL);
    // taosMemoryFree(q);
dengyihao's avatar
dengyihao 已提交
570 571 572 573 574
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
wafwerar's avatar
wafwerar 已提交
575
  taosMemoryFree(buf->base);
dengyihao's avatar
dengyihao 已提交
576 577 578 579 580 581 582 583 584 585 586 587

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
fix bug  
dengyihao 已提交
588
  SSrvConn* pConn = createConn(pThrd);
589

dengyihao's avatar
dengyihao 已提交
590
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
591
  /* init conn timer*/
dengyihao's avatar
dengyihao 已提交
592 593
  // uv_timer_init(pThrd->loop, &pConn->pTimer);
  // pConn->pTimer.data = pConn;
dengyihao's avatar
dengyihao 已提交
594 595 596 597

  pConn->hostThrd = pThrd;

  // init client handle
wafwerar's avatar
wafwerar 已提交
598
  pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
599 600 601
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

dengyihao's avatar
dengyihao 已提交
602
  pConn->pWriter.data = pConn;
dengyihao's avatar
dengyihao 已提交
603

dengyihao's avatar
dengyihao 已提交
604 605
  transSetConnOption((uv_tcp_t*)pConn->pTcp);

dengyihao's avatar
dengyihao 已提交
606 607 608
  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
609
    tTrace("server conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
610

dengyihao's avatar
dengyihao 已提交
611 612
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
613
      tError("server conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
614
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
615
      return;
dengyihao's avatar
dengyihao 已提交
616
    }
dengyihao's avatar
dengyihao 已提交
617 618 619 620

    addrlen = sizeof(pConn->locaddr);
    if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
      tError("server conn %p failed to get local info", pConn);
dengyihao's avatar
dengyihao 已提交
621
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
622 623 624
      return;
    }

dengyihao's avatar
dengyihao 已提交
625
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
dengyihao's avatar
dengyihao 已提交
626

dengyihao's avatar
dengyihao 已提交
627
  } else {
dengyihao's avatar
dengyihao 已提交
628
    tDebug("failed to create new connection");
dengyihao's avatar
dengyihao 已提交
629
    transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
630 631 632
  }
}

633
void* transAcceptThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
634
  // opt
dengyihao's avatar
dengyihao 已提交
635
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
636 637
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
638 639

  return NULL;
dengyihao's avatar
dengyihao 已提交
640
}
dengyihao's avatar
dengyihao 已提交
641 642
static bool addHandleToWorkloop(void* arg) {
  SWorkThrdObj* pThrd = arg;
wafwerar's avatar
wafwerar 已提交
643
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
644 645 646
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
647

dengyihao's avatar
dengyihao 已提交
648
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
dengyihao's avatar
dengyihao 已提交
649 650 651 652
  uv_pipe_open(pThrd->pipe, pThrd->fd);

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
653
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
654
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
655

dengyihao's avatar
fix bug  
dengyihao 已提交
656 657 658
  // conn set
  QUEUE_INIT(&pThrd->conn);

dengyihao's avatar
dengyihao 已提交
659
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
dengyihao's avatar
dengyihao 已提交
660
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
661 662 663 664 665 666 667 668 669 670 671 672 673
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
674
  // register an async here to quit server gracefully
wafwerar's avatar
wafwerar 已提交
675
  srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t));
dengyihao's avatar
dengyihao 已提交
676 677
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
678

dengyihao's avatar
dengyihao 已提交
679
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
680 681 682 683 684
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
685
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
686 687 688 689
    tError("failed to listen: %s", uv_err_name(err));
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
690
}
691
void* transWorkerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
692
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
693
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
694
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
695 696

  return NULL;
dengyihao's avatar
dengyihao 已提交
697 698
}

dengyihao's avatar
fix bug  
dengyihao 已提交
699 700 701
static SSrvConn* createConn(void* hThrd) {
  SWorkThrdObj* pThrd = hThrd;

wafwerar's avatar
wafwerar 已提交
702
  SSrvConn* pConn = (SSrvConn*)taosMemoryCalloc(1, sizeof(SSrvConn));
dengyihao's avatar
fix bug  
dengyihao 已提交
703 704 705
  QUEUE_INIT(&pConn->queue);

  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
dengyihao's avatar
dengyihao 已提交
706 707

  transQueueInit(&pConn->srvMsgs, NULL);
dengyihao's avatar
dengyihao 已提交
708

dengyihao's avatar
dengyihao 已提交
709
  memset(&pConn->regArg, 0, sizeof(pConn->regArg));
dengyihao's avatar
dengyihao 已提交
710
  pConn->broken = false;
dengyihao's avatar
dengyihao 已提交
711
  pConn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
712

dengyihao's avatar
dengyihao 已提交
713 714 715 716 717 718 719
  SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
  exh->handle = pConn;
  exh->pThrd = pThrd;
  exh->refId = uvAddExHandle(exh);
  uvAcquireExHandle(exh->refId);

  pConn->refId = exh->refId;
dengyihao's avatar
dengyihao 已提交
720
  transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
721
  tTrace("server conn %p created", pConn);
dengyihao's avatar
dengyihao 已提交
722 723
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
724

dengyihao's avatar
dengyihao 已提交
725
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
726 727 728
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
729 730 731
  uvReleaseExHandle(conn->refId);
  uvRemoveExHandle(conn->refId);

dengyihao's avatar
dengyihao 已提交
732
  transDestroyBuffer(&conn->readBuf);
733
  if (clear) {
dengyihao's avatar
dengyihao 已提交
734
    tTrace("server conn %p to be destroyed", conn);
wafwerar's avatar
wafwerar 已提交
735
    uv_shutdown_t* req = taosMemoryMalloc(sizeof(uv_shutdown_t));
dengyihao's avatar
dengyihao 已提交
736
    uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
737
  }
dengyihao's avatar
dengyihao 已提交
738 739
}
static void uvDestroyConn(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
740 741 742 743
  SSrvConn* conn = handle->data;
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
744 745
  SWorkThrdObj* thrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
746
  tDebug("server conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
747
  // uv_timer_stop(&conn->pTimer);
dengyihao's avatar
dengyihao 已提交
748
  transQueueDestroy(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
749
  QUEUE_REMOVE(&conn->queue);
wafwerar's avatar
wafwerar 已提交
750
  taosMemoryFree(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
751
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
752

dengyihao's avatar
dengyihao 已提交
753
  if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
754
    tTrace("work thread quit");
dengyihao's avatar
dengyihao 已提交
755
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
756
  }
dengyihao's avatar
dengyihao 已提交
757 758
}

U
ubuntu 已提交
759
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
760 761
  SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
762 763
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
wafwerar's avatar
wafwerar 已提交
764 765
  srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
dengyihao's avatar
dengyihao 已提交
766 767 768 769
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

dengyihao's avatar
dengyihao 已提交
770 771
  uvOpenExHandleMgt(10000);

dengyihao's avatar
dengyihao 已提交
772
  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
773
    SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
774
    thrd->quit = false;
dengyihao's avatar
dengyihao 已提交
775
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
776

wafwerar's avatar
wafwerar 已提交
777
    srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
dengyihao's avatar
dengyihao 已提交
778 779
    int fds[2];
    if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
dengyihao's avatar
dengyihao 已提交
780
      goto End;
dengyihao's avatar
dengyihao 已提交
781 782 783 784
    }
    uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

dengyihao's avatar
dengyihao 已提交
785
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
786 787
    thrd->fd = fds[0];
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
788

dengyihao's avatar
dengyihao 已提交
789 790 791
    if (false == addHandleToWorkloop(thrd)) {
      goto End;
    }
792
    int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
dengyihao's avatar
dengyihao 已提交
793 794 795 796 797 798 799 800
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
    }
  }
dengyihao's avatar
dengyihao 已提交
801 802 803
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
804
  int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
dengyihao's avatar
dengyihao 已提交
805 806 807 808 809 810 811
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
    // clear all resource later
  }

  return srv;
dengyihao's avatar
dengyihao 已提交
812
End:
U
ubuntu 已提交
813
  transCloseServer(srv);
dengyihao's avatar
dengyihao 已提交
814 815
  return NULL;
}
dengyihao's avatar
dengyihao 已提交
816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844
void uvOpenExHandleMgt(int size) {
  // added into once later
  exHandlesMgt = taosOpenRef(size, uvDestoryExHandle);
}
int64_t uvAddExHandle(void* p) {
  // acquire extern handle
  return taosAddRef(exHandlesMgt, p);
}
int32_t uvRemoveExHandle(int64_t refId) {
  // acquire extern handle
  return taosRemoveRef(exHandlesMgt, refId);
}

SExHandle* uvAcquireExHandle(int64_t refId) {
  // acquire extern handle
  return (SExHandle*)taosAcquireRef(exHandlesMgt, refId);
}

int32_t uvReleaseExHandle(int64_t refId) {
  // release extern handle
  return taosReleaseRef(exHandlesMgt, refId);
}
void uvDestoryExHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  taosMemoryFree(handle);
}

dengyihao's avatar
dengyihao 已提交
845
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
846
  thrd->quit = true;
dengyihao's avatar
dengyihao 已提交
847
  if (QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
848
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
849 850 851
  } else {
    destroyAllConn(thrd);
  }
wafwerar's avatar
wafwerar 已提交
852
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
853 854 855 856
}
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
857
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
858
      return;
dengyihao's avatar
dengyihao 已提交
859 860 861
    }
    uvStartSendRespInternal(msg);
    return;
dengyihao's avatar
dengyihao 已提交
862 863
  } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
    tDebug("server conn %p already released, ignore release-msg", conn);
dengyihao's avatar
dengyihao 已提交
864
  }
dengyihao's avatar
dengyihao 已提交
865
  destroySmsg(msg);
dengyihao's avatar
dengyihao 已提交
866
}
dengyihao's avatar
dengyihao 已提交
867
void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
868
  // send msg to client
dengyihao's avatar
dengyihao 已提交
869
  tDebug("server conn %p start to send resp (2/2)", msg->pConn);
dengyihao's avatar
dengyihao 已提交
870 871
  uvStartSendResp(msg);
}
dengyihao's avatar
dengyihao 已提交
872 873
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
dengyihao's avatar
dengyihao 已提交
874
  tDebug("server conn %p register brokenlink callback", conn);
dengyihao's avatar
dengyihao 已提交
875
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
876
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
877 878
      return;
    }
dengyihao's avatar
dengyihao 已提交
879
    transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
880 881 882
    conn->regArg.notifyCount = 0;
    conn->regArg.init = 1;
    conn->regArg.msg = msg->msg;
dengyihao's avatar
dengyihao 已提交
883
    tDebug("server conn %p register brokenlink callback succ", conn);
dengyihao's avatar
dengyihao 已提交
884 885 886 887 888 889

    if (conn->broken) {
      STrans* pTransInst = conn->pTransInst;
      (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
      memset(&conn->regArg, 0, sizeof(conn->regArg));
    }
wafwerar's avatar
wafwerar 已提交
890
    taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
891 892
  }
}
dengyihao's avatar
dengyihao 已提交
893 894 895 896
void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
897
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
898
  SRV_RELEASE_UV(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
899
  transDestroyAsyncPool(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
900
  taosMemoryFree(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
901
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
902
}
dengyihao's avatar
dengyihao 已提交
903
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
wafwerar's avatar
wafwerar 已提交
904
  SSrvMsg* msg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
905
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
906
  tDebug("server send quit msg to work thread");
dengyihao's avatar
dengyihao 已提交
907
  transSendAsync(pThrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
908 909
}

U
ubuntu 已提交
910
void transCloseServer(void* arg) {
dengyihao's avatar
dengyihao 已提交
911 912
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
913 914 915

  tDebug("send quit msg to accept thread");
  uv_async_send(srv->pAcceptAsync);
wafwerar's avatar
wafwerar 已提交
916
  taosThreadJoin(srv->thread, NULL);
dengyihao's avatar
dengyihao 已提交
917

dengyihao's avatar
dengyihao 已提交
918
  SRV_RELEASE_UV(srv->loop);
dengyihao's avatar
dengyihao 已提交
919

dengyihao's avatar
dengyihao 已提交
920 921 922 923 924
  for (int i = 0; i < srv->numOfThreads; i++) {
    sendQuitToWorkThrd(srv->pThreadObj[i]);
    destroyWorkThrd(srv->pThreadObj[i]);
  }

wafwerar's avatar
wafwerar 已提交
925 926 927
  taosMemoryFree(srv->pThreadObj);
  taosMemoryFree(srv->pAcceptAsync);
  taosMemoryFree(srv->loop);
dengyihao's avatar
dengyihao 已提交
928 929

  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
930
    taosMemoryFree(srv->pipe[i]);
dengyihao's avatar
dengyihao 已提交
931
  }
wafwerar's avatar
wafwerar 已提交
932
  taosMemoryFree(srv->pipe);
dengyihao's avatar
dengyihao 已提交
933

wafwerar's avatar
wafwerar 已提交
934
  taosMemoryFree(srv);
dengyihao's avatar
dengyihao 已提交
935
}
dengyihao's avatar
dengyihao 已提交
936

dengyihao's avatar
dengyihao 已提交
937 938 939 940 941
void transRefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
942
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
943 944 945 946 947 948 949
}

void transUnrefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
950
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
951 952 953 954
  if (ref == 0) {
    destroyConn((SSrvConn*)handle, true);
  }
}
dengyihao's avatar
dengyihao 已提交
955 956

void transReleaseSrvHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
957 958 959 960
  SExHandle* exh = handle;
  // TODO(yihaoDeng): not safy here,
  int64_t refId = exh->refId;
  ASYNC_CHECK_HANDLE(exh, refId);
dengyihao's avatar
dengyihao 已提交
961

dengyihao's avatar
dengyihao 已提交
962 963 964 965
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);

  STransMsg tmsg = {.code = 0, .handle = exh, .ahandle = NULL, .refId = exh->refId};
dengyihao's avatar
dengyihao 已提交
966

wafwerar's avatar
wafwerar 已提交
967
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
968 969 970
  srvMsg->msg = tmsg;
  srvMsg->type = Release;

dengyihao's avatar
dengyihao 已提交
971
  tTrace("server conn %p start to release", exh->handle);
dengyihao's avatar
dengyihao 已提交
972
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
973 974 975 976
  uvReleaseExHandle(refId);
  return;
_return:
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
977
}
dengyihao's avatar
dengyihao 已提交
978 979 980 981 982 983 984
void transSendResponse(const STransMsg* msg) {
  SExHandle* exh = msg->handle;
  int64_t    refId = msg->refId;
  ASYNC_CHECK_HANDLE(exh, refId);

  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);
dengyihao's avatar
dengyihao 已提交
985

wafwerar's avatar
wafwerar 已提交
986
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
987
  srvMsg->msg = *msg;
dengyihao's avatar
dengyihao 已提交
988
  srvMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
989
  tTrace("server conn %p start to send resp (1/2)", exh->handle);
dengyihao's avatar
dengyihao 已提交
990
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
991 992 993 994
  uvReleaseExHandle(refId);
  return;
_return:
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
995
}
dengyihao's avatar
dengyihao 已提交
996
void transRegisterMsg(const STransMsg* msg) {
dengyihao's avatar
dengyihao 已提交
997 998 999 1000 1001 1002
  SExHandle* exh = NULL;
  int64_t    refId = msg->refId;
  ASYNC_CHECK_HANDLE(exh, refId);

  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);
dengyihao's avatar
dengyihao 已提交
1003

wafwerar's avatar
wafwerar 已提交
1004
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1005 1006
  srvMsg->msg = *msg;
  srvMsg->type = Register;
dengyihao's avatar
dengyihao 已提交
1007
  tTrace("server conn %p start to register brokenlink callback", exh->handle);
dengyihao's avatar
dengyihao 已提交
1008
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
1009 1010 1011 1012
  uvReleaseExHandle(refId);
  return;
_return:
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
1013
}
dengyihao's avatar
formate  
dengyihao 已提交
1014
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
dengyihao's avatar
dengyihao 已提交
1015 1016
  SExHandle*         ex = thandle;
  SSrvConn*          pConn = ex->handle;
dengyihao's avatar
dengyihao 已提交
1017
  struct sockaddr_in addr = pConn->addr;
U
ubuntu 已提交
1018

dengyihao's avatar
dengyihao 已提交
1019 1020
  pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(addr.sin_port);
dengyihao's avatar
dengyihao 已提交
1021 1022 1023 1024
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
1025
#endif