transSrv.c 34.8 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20 21 22
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;

static char* notify = "a";
dengyihao's avatar
dengyihao 已提交
23
static int   transSrvInst = 0;
dengyihao's avatar
dengyihao 已提交
24

dengyihao's avatar
dengyihao 已提交
25 26 27 28 29 30
typedef struct {
  int       notifyCount;  //
  int       init;         // init or not
  STransMsg msg;
} SSrvRegArg;

dengyihao's avatar
dengyihao 已提交
31
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
32
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
33 34 35
  uv_tcp_t*  pTcp;
  uv_write_t pWriter;
  uv_timer_t pTimer;
dengyihao's avatar
dengyihao 已提交
36 37

  queue       queue;
dengyihao's avatar
dengyihao 已提交
38
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
39
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
40 41 42
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
43
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
44
  STransQueue srvMsgs;
dengyihao's avatar
dengyihao 已提交
45

dengyihao's avatar
dengyihao 已提交
46 47
  SSrvRegArg regArg;
  bool       broken;  // conn broken;
dengyihao's avatar
dengyihao 已提交
48

dengyihao's avatar
dengyihao 已提交
49
  ConnStatus         status;
dengyihao's avatar
dengyihao 已提交
50
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
51
  struct sockaddr_in locaddr;
U
ubuntu 已提交
52

dengyihao's avatar
dengyihao 已提交
53 54 55 56 57 58
  int64_t refId;
  int     spi;
  char    info[64];
  char    user[TSDB_UNI_LEN];  // user ID for the link
  char    secret[TSDB_PASSWORD_LEN];
  char    ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
59 60 61
} SSrvConn;

typedef struct SSrvMsg {
dengyihao's avatar
dengyihao 已提交
62 63 64 65
  SSrvConn*     pConn;
  STransMsg     msg;
  queue         q;
  STransMsgType type;
dengyihao's avatar
dengyihao 已提交
66
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
67 68

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
69 70 71 72 73 74
  TdThread      thread;
  uv_pipe_t*    pipe;
  uv_os_fd_t    fd;
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  queue         msg;
wafwerar's avatar
wafwerar 已提交
75
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
76 77 78 79

  queue conn;
  void* pTransInst;
  bool  quit;
dengyihao's avatar
dengyihao 已提交
80 81 82
} SWorkThrdObj;

typedef struct SServerObj {
dengyihao's avatar
dengyihao 已提交
83
  TdThread   thread;
dengyihao's avatar
dengyihao 已提交
84 85 86 87
  uv_tcp_t   server;
  uv_loop_t* loop;

  // work thread info
dengyihao's avatar
dengyihao 已提交
88 89 90
  int            workerIdx;
  int            numOfThreads;
  SWorkThrdObj** pThreadObj;
dengyihao's avatar
dengyihao 已提交
91 92 93 94 95

  uv_pipe_t** pipe;
  uint32_t    ip;
  uint32_t    port;
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
96 97
} SServerObj;

dengyihao's avatar
dengyihao 已提交
98 99 100 101 102 103 104
// handle
typedef struct SExHandle {
  void*         handle;
  int64_t       refId;
  SWorkThrdObj* pThrd;
} SExHandle;

dengyihao's avatar
dengyihao 已提交
105
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
106
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
107
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
108
static void uvOnTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
109
static void uvOnSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
110
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
111 112 113
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
114
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
115
static void uvShutDownCb(uv_shutdown_t* req, int status);
dengyihao's avatar
dengyihao 已提交
116 117 118 119 120 121 122

/*
 * time-consuming task throwed into BG work thread
 */
static void uvWorkDoTask(uv_work_t* req);
static void uvWorkAfterTask(uv_work_t* req, int status);

dengyihao's avatar
dengyihao 已提交
123 124
static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
125

dengyihao's avatar
dengyihao 已提交
126
static void uvStartSendRespInternal(SSrvMsg* smsg);
dengyihao's avatar
dengyihao 已提交
127 128
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
129

dengyihao's avatar
dengyihao 已提交
130 131
static void uvNotifyLinkBrokenToApp(SSrvConn* conn);

dengyihao's avatar
dengyihao 已提交
132
static void destroySmsg(SSrvMsg* smsg);
133
// check whether already read complete packet
dengyihao's avatar
fix bug  
dengyihao 已提交
134
static SSrvConn* createConn(void* hThrd);
dengyihao's avatar
dengyihao 已提交
135
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
136

dengyihao's avatar
dengyihao 已提交
137 138
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
139
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
140
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
141 142
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
                                                                       uvHandleRegister};
dengyihao's avatar
dengyihao 已提交
143

dengyihao's avatar
dengyihao 已提交
144
static int32_t exHandlesMgt;
dengyihao's avatar
dengyihao 已提交
145

dengyihao's avatar
dengyihao 已提交
146
void       uvInitExHandleMgt();
dengyihao's avatar
dengyihao 已提交
147
void       uvOpenExHandleMgt(int size);
dengyihao's avatar
dengyihao 已提交
148
void       uvCloseExHandleMgt();
dengyihao's avatar
dengyihao 已提交
149 150 151 152 153 154
int64_t    uvAddExHandle(void* p);
int32_t    uvRemoveExHandle(int64_t refId);
int32_t    uvReleaseExHandle(int64_t refId);
void       uvDestoryExHandle(void* handle);
SExHandle* uvAcquireExHandle(int64_t refId);

155
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
156

dengyihao's avatar
dengyihao 已提交
157
// server and worker thread
158 159
static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
dengyihao's avatar
dengyihao 已提交
160

dengyihao's avatar
dengyihao 已提交
161 162 163 164
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
#define CONN_SHOULD_RELEASE(conn, head)                                     \
  do {                                                                      \
    if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) {          \
      conn->status = ConnRelease;                                           \
      transClearBuffer(&conn->readBuf);                                     \
      transFreeMsg(transContFromHead((char*)head));                         \
      tTrace("server conn %p received release request", conn);              \
                                                                            \
      STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
      SSrvMsg*  srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));              \
      srvMsg->msg = tmsg;                                                   \
      srvMsg->type = Release;                                               \
      srvMsg->pConn = conn;                                                 \
      if (!transQueuePush(&conn->srvMsgs, srvMsg)) {                        \
        return;                                                             \
      }                                                                     \
      uvStartSendRespInternal(srvMsg);                                      \
      return;                                                               \
    }                                                                       \
  } while (0)

#define SRV_RELEASE_UV(loop)       \
  do {                             \
    uv_walk(loop, uvWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);  \
    uv_loop_close(loop);           \
  } while (0);

dengyihao's avatar
dengyihao 已提交
193 194 195 196
#define ASYNC_ERR_JRET(thrd)                            \
  do {                                                  \
    if (thrd->quit) {                                   \
      tTrace("worker thread already quit, ignore msg"); \
dengyihao's avatar
dengyihao 已提交
197
      goto _return1;                                    \
dengyihao's avatar
dengyihao 已提交
198 199 200
    }                                                   \
  } while (0)

dengyihao's avatar
dengyihao 已提交
201 202 203 204 205
#define ASYNC_CHECK_HANDLE(exh1, refId)                                                                               \
  do {                                                                                                                \
    if (refId > 0) {                                                                                                  \
      tTrace("server handle step1");                                                                                  \
      SExHandle* exh2 = uvAcquireExHandle(refId);                                                                     \
dengyihao's avatar
dengyihao 已提交
206
      if (exh2 == NULL || refId != exh2->refId) {                                                                     \
dengyihao's avatar
dengyihao 已提交
207 208 209 210 211
        tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
               exh1->refId, refId);                                                                                   \
        goto _return1;                                                                                                \
      }                                                                                                               \
    } else if (refId == 0) {                                                                                          \
dengyihao's avatar
dengyihao 已提交
212
      tTrace("server handle step2");                                                                                  \
dengyihao's avatar
dengyihao 已提交
213
      SExHandle* exh2 = uvAcquireExHandle(refId);                                                                     \
dengyihao's avatar
dengyihao 已提交
214 215 216
      if (exh2 == NULL || refId != exh2->refId) {                                                                     \
        tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
               refId, exh2 ? exh2->refId : 0);                                                                        \
dengyihao's avatar
dengyihao 已提交
217 218 219 220 221
        goto _return1;                                                                                                \
      } else {                                                                                                        \
        refId = exh1->refId;                                                                                          \
      }                                                                                                               \
    } else if (refId == -1) {                                                                                         \
dengyihao's avatar
dengyihao 已提交
222
      tTrace("server handle step3");                                                                                  \
dengyihao's avatar
dengyihao 已提交
223 224
      goto _return2;                                                                                                  \
    }                                                                                                                 \
dengyihao's avatar
dengyihao 已提交
225 226
  } while (0)

dengyihao's avatar
dengyihao 已提交
227
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
228 229 230
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
231 232 233 234
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
235
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
236
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
237 238
}

dengyihao's avatar
dengyihao 已提交
239 240
static void uvHandleReq(SSrvConn* pConn) {
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
241 242 243 244
  char*        msg = pBuf->buf;
  uint32_t     msgLen = pBuf->len;

  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
245
  pHead->code = htonl(pHead->code);
dengyihao's avatar
dengyihao 已提交
246
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
247
  memcpy(pConn->user, pHead->user, strlen(pHead->user));
dengyihao's avatar
dengyihao 已提交
248

dengyihao's avatar
dengyihao 已提交
249 250 251 252 253 254 255
  // TODO(dengyihao): time-consuming task throwed into BG Thread
  //  uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
  //  wreq->data = pConn;
  //  uv_read_stop((uv_stream_t*)pConn->pTcp);
  //  transRefSrvHandle(pConn);
  //  uv_queue_work(((SWorkThrdObj*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);

dengyihao's avatar
dengyihao 已提交
256
  CONN_SHOULD_RELEASE(pConn, pHead);
dengyihao's avatar
dengyihao 已提交
257

dengyihao's avatar
dengyihao 已提交
258 259 260 261 262
  STransMsg transMsg;
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = pHead->content;
  transMsg.msgType = pHead->msgType;
  transMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
263
  transMsg.ahandle = (void*)pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
264
  transMsg.handle = NULL;
dengyihao's avatar
dengyihao 已提交
265

dengyihao's avatar
dengyihao 已提交
266
  // transDestroyBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
267
  transClearBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
268
  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
269 270 271 272
  if (pConn->status == ConnNormal) {
    if (pHead->persist == 1) {
      pConn->status = ConnAcquire;
      transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
273
      tDebug("server conn %p acquired by server app", pConn);
dengyihao's avatar
dengyihao 已提交
274 275 276
    }
  }
  if (pConn->status == ConnNormal && pHead->noResp == 0) {
dengyihao's avatar
dengyihao 已提交
277
    transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
278 279 280
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
           taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
           ntohs(pConn->locaddr.sin_port), transMsg.contLen);
dengyihao's avatar
dengyihao 已提交
281
  } else {
dengyihao's avatar
dengyihao 已提交
282 283 284
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
           TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
           taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
dengyihao's avatar
dengyihao 已提交
285 286 287
    // no ref here
  }

dengyihao's avatar
dengyihao 已提交
288 289 290 291
  // if pHead->noResp = 1,
  // 1. server application should not send resp on handle
  // 2. once send out data, cli conn released to conn pool immediately
  // 3. not mixed with persist
dengyihao's avatar
dengyihao 已提交
292

dengyihao's avatar
dengyihao 已提交
293
  transMsg.handle = (void*)uvAcquireExHandle(pConn->refId);
dengyihao's avatar
dengyihao 已提交
294 295 296
  tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.handle, pConn, pConn->refId);
  transMsg.refId = pConn->refId;
  assert(transMsg.handle != NULL);
dengyihao's avatar
dengyihao 已提交
297
  if (pHead->noResp == 1) {
dengyihao's avatar
dengyihao 已提交
298
    transMsg.refId = -1;
dengyihao's avatar
dengyihao 已提交
299 300
  }
  uvReleaseExHandle(pConn->refId);
dengyihao's avatar
dengyihao 已提交
301

dengyihao's avatar
dengyihao 已提交
302
  STrans* pTransInst = pConn->pTransInst;
dengyihao's avatar
dengyihao 已提交
303
  (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
dengyihao's avatar
dengyihao 已提交
304
  // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
305 306
}

dengyihao's avatar
dengyihao 已提交
307
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
308
  // opt
dengyihao's avatar
dengyihao 已提交
309 310
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
311 312
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
313
    tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
314
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
315
      tTrace("server conn %p alread read complete packet", conn);
dengyihao's avatar
dengyihao 已提交
316
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
317
    } else {
dengyihao's avatar
dengyihao 已提交
318
      tTrace("server %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
319 320 321
    }
    return;
  }
322 323 324
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
325 326

  tError("server conn %p read error: %s", conn, uv_err_name(nread));
U
ubuntu 已提交
327
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
328
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
329 330
    if (conn->status == ConnAcquire) {
      if (conn->regArg.init) {
dengyihao's avatar
dengyihao 已提交
331
        tTrace("server conn %p broken, notify server app", conn);
dengyihao's avatar
dengyihao 已提交
332 333 334 335 336
        STrans* pTransInst = conn->pTransInst;
        (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
        memset(&conn->regArg, 0, sizeof(conn->regArg));
      }
    }
dengyihao's avatar
dengyihao 已提交
337
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
338 339 340 341
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->len = 2;
wafwerar's avatar
wafwerar 已提交
342
  buf->base = taosMemoryCalloc(1, sizeof(char) * buf->len);
dengyihao's avatar
dengyihao 已提交
343 344 345 346
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
347
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
348
  tError("server conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
349 350
}

dengyihao's avatar
dengyihao 已提交
351
void uvOnSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
352
  SSrvConn* conn = req->data;
dengyihao's avatar
dengyihao 已提交
353
  // transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
354
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
355
    tTrace("server conn %p data already was written on stream", conn);
dengyihao's avatar
dengyihao 已提交
356 357
    if (!transQueueEmpty(&conn->srvMsgs)) {
      SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
358 359 360 361
      if (msg->type == Release && conn->status != ConnNormal) {
        conn->status = ConnNormal;
        transUnrefSrvHandle(conn);
      }
dengyihao's avatar
add UT  
dengyihao 已提交
362 363
      destroySmsg(msg);
      // send second data, just use for push
dengyihao's avatar
dengyihao 已提交
364
      if (!transQueueEmpty(&conn->srvMsgs)) {
dengyihao's avatar
dengyihao 已提交
365
        msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
366 367 368 369 370 371 372 373 374
        if (msg->type == Register && conn->status == ConnAcquire) {
          conn->regArg.notifyCount = 0;
          conn->regArg.init = 1;
          conn->regArg.msg = msg->msg;
          if (conn->broken) {
            STrans* pTransInst = conn->pTransInst;
            (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
            memset(&conn->regArg, 0, sizeof(conn->regArg));
          }
dengyihao's avatar
dengyihao 已提交
375
          transQueuePop(&conn->srvMsgs);
wafwerar's avatar
wafwerar 已提交
376
          taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
377 378

          msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
379 380 381
          if (msg != NULL) {
            uvStartSendRespInternal(msg);
          }
dengyihao's avatar
dengyihao 已提交
382 383 384
        } else {
          uvStartSendRespInternal(msg);
        }
dengyihao's avatar
add UT  
dengyihao 已提交
385
      }
dengyihao's avatar
dengyihao 已提交
386
    }
dengyihao's avatar
dengyihao 已提交
387
  } else {
dengyihao's avatar
dengyihao 已提交
388
    tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
389
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
390
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
391 392
  }
}
dengyihao's avatar
dengyihao 已提交
393 394
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
395
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
396 397 398
  } else {
    tError("fail to dispatch conn to work thread");
  }
dengyihao's avatar
dengyihao 已提交
399 400
  uv_close((uv_handle_t*)req->data, uvFreeCb);
  // taosMemoryFree(req->data);
wafwerar's avatar
wafwerar 已提交
401
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
402
}
dengyihao's avatar
dengyihao 已提交
403

dengyihao's avatar
dengyihao 已提交
404
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
dengyihao's avatar
dengyihao 已提交
405
  tTrace("server conn %p prepare to send resp", smsg->pConn);
dengyihao's avatar
dengyihao 已提交
406

dengyihao's avatar
formate  
dengyihao 已提交
407 408
  SSrvConn*  pConn = smsg->pConn;
  STransMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
409 410 411 412 413
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
414
  pHead->ahandle = (uint64_t)pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
415

dengyihao's avatar
dengyihao 已提交
416 417 418 419 420
  if (pConn->status == ConnNormal) {
    pHead->msgType = pConn->inType + 1;
  } else {
    pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType;
  }
dengyihao's avatar
dengyihao 已提交
421
  pHead->release = smsg->type == Release ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
422
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
423

dengyihao's avatar
dengyihao 已提交
424 425
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
426 427 428
  tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
         taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
         ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
429
  pHead->msgLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
430

dengyihao's avatar
dengyihao 已提交
431 432 433
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
434 435

static void uvStartSendRespInternal(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
436 437 438 439
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
440
  // uv_timer_stop(&pConn->pTimer);
dengyihao's avatar
dengyihao 已提交
441
  uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
dengyihao's avatar
dengyihao 已提交
442 443 444 445
}
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
446 447

  if (pConn->broken == true) {
dengyihao's avatar
dengyihao 已提交
448
    // persist by
dengyihao's avatar
dengyihao 已提交
449 450 451
    transUnrefSrvHandle(pConn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
452 453 454
  if (pConn->status == ConnNormal) {
    transUnrefSrvHandle(pConn);
  }
dengyihao's avatar
dengyihao 已提交
455

dengyihao's avatar
dengyihao 已提交
456
  if (!transQueuePush(&pConn->srvMsgs, smsg)) {
dengyihao's avatar
dengyihao 已提交
457 458 459
    return;
  }
  uvStartSendRespInternal(smsg);
dengyihao's avatar
dengyihao 已提交
460 461
  return;
}
dengyihao's avatar
dengyihao 已提交
462

dengyihao's avatar
dengyihao 已提交
463
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
464 465 466 467
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
wafwerar's avatar
wafwerar 已提交
468
  taosMemoryFree(smsg);
dengyihao's avatar
dengyihao 已提交
469
}
dengyihao's avatar
fix bug  
dengyihao 已提交
470
static void destroyAllConn(SWorkThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
471
  tTrace("thread %p destroy all conn ", pThrd);
dengyihao's avatar
fix bug  
dengyihao 已提交
472 473 474 475 476 477
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
    queue* h = QUEUE_HEAD(&pThrd->conn);
    QUEUE_REMOVE(h);
    QUEUE_INIT(h);

    SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
dengyihao's avatar
dengyihao 已提交
478 479 480
    while (T_REF_VAL_GET(c) >= 2) {
      transUnrefSrvHandle(c);
    }
dengyihao's avatar
dengyihao 已提交
481
    transUnrefSrvHandle(c);
dengyihao's avatar
fix bug  
dengyihao 已提交
482 483
  }
}
dengyihao's avatar
dengyihao 已提交
484
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
485 486
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
487
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
488
  queue         wq;
dengyihao's avatar
dengyihao 已提交
489

dengyihao's avatar
dengyihao 已提交
490
  // batch process to avoid to lock/unlock frequently
wafwerar's avatar
wafwerar 已提交
491
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
492
  QUEUE_MOVE(&item->qmsg, &wq);
wafwerar's avatar
wafwerar 已提交
493
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
494 495 496 497

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
498 499 500

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
501
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
502
      continue;
dengyihao's avatar
dengyihao 已提交
503
    }
dengyihao's avatar
dengyihao 已提交
504
    // release handle to rpc init
dengyihao's avatar
dengyihao 已提交
505 506
    if (msg->type == Quit) {
      (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
507
      continue;
dengyihao's avatar
dengyihao 已提交
508 509 510 511 512 513 514 515 516 517 518 519 520
    } else {
      STransMsg transMsg = msg->msg;

      SExHandle* exh1 = transMsg.handle;
      int64_t    refId = transMsg.refId;
      SExHandle* exh2 = uvAcquireExHandle(refId);
      if (exh2 == NULL || exh1 != exh2) {
        tTrace("server handle %p except msg, ignore it", exh1);
        uvReleaseExHandle(refId);
        destroySmsg(msg);
        continue;
      }
      msg->pConn = exh1->handle;
dengyihao's avatar
dengyihao 已提交
521
      uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
522
      (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
523
    }
dengyihao's avatar
dengyihao 已提交
524 525
  }
}
dengyihao's avatar
dengyihao 已提交
526 527 528 529 530
static void uvWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}
dengyihao's avatar
dengyihao 已提交
531 532 533 534
static void uvFreeCb(uv_handle_t* handle) {
  //
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
535

dengyihao's avatar
dengyihao 已提交
536 537
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
dengyihao's avatar
dengyihao 已提交
538
  tDebug("close server port %d", srv->port);
dengyihao's avatar
dengyihao 已提交
539
  uv_walk(srv->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
540
}
dengyihao's avatar
dengyihao 已提交
541

dengyihao's avatar
dengyihao 已提交
542
static void uvShutDownCb(uv_shutdown_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
543 544 545
  if (status != 0) {
    tDebug("conn failed to shut down: %s", uv_err_name(status));
  }
dengyihao's avatar
dengyihao 已提交
546
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
wafwerar's avatar
wafwerar 已提交
547
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
548 549
}

dengyihao's avatar
dengyihao 已提交
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
static void uvWorkDoTask(uv_work_t* req) {
  // doing time-consumeing task
  // only auth conn currently, add more func later
  tTrace("server conn %p start to be processed in BG Thread", req->data);
  return;
}

static void uvWorkAfterTask(uv_work_t* req, int status) {
  if (status != 0) {
    tTrace("server conn %p failed to processed ", req->data);
  }
  // Done time-consumeing task
  // add more func later
  // this func called in main loop
  tTrace("server conn %p already processed ", req->data);
  taosMemoryFree(req);
}

dengyihao's avatar
dengyihao 已提交
568 569 570 571 572 573
void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

wafwerar's avatar
wafwerar 已提交
574
  uv_tcp_t* cli = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
575 576 577
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
wafwerar's avatar
wafwerar 已提交
578
    uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
579
    wr->data = cli;
dengyihao's avatar
dengyihao 已提交
580 581 582
    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
583

dengyihao's avatar
dengyihao 已提交
584
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
585

dengyihao's avatar
dengyihao 已提交
586
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
587 588 589 590 591
  } else {
    uv_close((uv_handle_t*)cli, NULL);
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
592
  tTrace("server connection coming");
dengyihao's avatar
dengyihao 已提交
593 594 595 596 597
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
dengyihao's avatar
dengyihao 已提交
598 599 600 601
    tError("failed to create connect: %p", q);
    taosMemoryFree(buf->base);
    uv_close((uv_handle_t*)q, NULL);
    // taosMemoryFree(q);
dengyihao's avatar
dengyihao 已提交
602 603 604 605 606
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
wafwerar's avatar
wafwerar 已提交
607
  taosMemoryFree(buf->base);
dengyihao's avatar
dengyihao 已提交
608 609 610 611 612 613 614 615 616 617 618 619

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
fix bug  
dengyihao 已提交
620
  SSrvConn* pConn = createConn(pThrd);
621

dengyihao's avatar
dengyihao 已提交
622
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
623
  /* init conn timer*/
dengyihao's avatar
dengyihao 已提交
624 625
  // uv_timer_init(pThrd->loop, &pConn->pTimer);
  // pConn->pTimer.data = pConn;
dengyihao's avatar
dengyihao 已提交
626 627 628 629

  pConn->hostThrd = pThrd;

  // init client handle
wafwerar's avatar
wafwerar 已提交
630
  pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
631 632 633
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

dengyihao's avatar
dengyihao 已提交
634
  pConn->pWriter.data = pConn;
dengyihao's avatar
dengyihao 已提交
635

dengyihao's avatar
dengyihao 已提交
636 637
  transSetConnOption((uv_tcp_t*)pConn->pTcp);

dengyihao's avatar
dengyihao 已提交
638 639 640
  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
641
    tTrace("server conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
642

dengyihao's avatar
dengyihao 已提交
643 644
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
645
      tError("server conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
646
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
647
      return;
dengyihao's avatar
dengyihao 已提交
648
    }
dengyihao's avatar
dengyihao 已提交
649 650 651 652

    addrlen = sizeof(pConn->locaddr);
    if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
      tError("server conn %p failed to get local info", pConn);
dengyihao's avatar
dengyihao 已提交
653
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
654 655 656
      return;
    }

dengyihao's avatar
dengyihao 已提交
657
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
dengyihao's avatar
dengyihao 已提交
658

dengyihao's avatar
dengyihao 已提交
659
  } else {
dengyihao's avatar
dengyihao 已提交
660
    tDebug("failed to create new connection");
dengyihao's avatar
dengyihao 已提交
661
    transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
662 663 664
  }
}

665
void* transAcceptThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
666
  // opt
dengyihao's avatar
dengyihao 已提交
667
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
668 669
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
670 671

  return NULL;
dengyihao's avatar
dengyihao 已提交
672
}
dengyihao's avatar
dengyihao 已提交
673 674
static bool addHandleToWorkloop(void* arg) {
  SWorkThrdObj* pThrd = arg;
wafwerar's avatar
wafwerar 已提交
675
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
676 677 678
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
679

dengyihao's avatar
dengyihao 已提交
680
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
dengyihao's avatar
dengyihao 已提交
681 682 683 684
  uv_pipe_open(pThrd->pipe, pThrd->fd);

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
685
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
686
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
687

dengyihao's avatar
fix bug  
dengyihao 已提交
688 689 690
  // conn set
  QUEUE_INIT(&pThrd->conn);

dengyihao's avatar
dengyihao 已提交
691
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
dengyihao's avatar
dengyihao 已提交
692
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
693 694 695 696 697 698 699 700 701 702 703 704 705
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
706
  // register an async here to quit server gracefully
wafwerar's avatar
wafwerar 已提交
707
  srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t));
dengyihao's avatar
dengyihao 已提交
708 709
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
710

dengyihao's avatar
dengyihao 已提交
711
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
712 713 714 715 716
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
717
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
718 719 720 721
    tError("failed to listen: %s", uv_err_name(err));
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
722
}
723
void* transWorkerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
724
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
725
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
726
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
727 728

  return NULL;
dengyihao's avatar
dengyihao 已提交
729 730
}

dengyihao's avatar
fix bug  
dengyihao 已提交
731 732 733
static SSrvConn* createConn(void* hThrd) {
  SWorkThrdObj* pThrd = hThrd;

wafwerar's avatar
wafwerar 已提交
734
  SSrvConn* pConn = (SSrvConn*)taosMemoryCalloc(1, sizeof(SSrvConn));
dengyihao's avatar
fix bug  
dengyihao 已提交
735 736 737
  QUEUE_INIT(&pConn->queue);

  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
dengyihao's avatar
dengyihao 已提交
738 739

  transQueueInit(&pConn->srvMsgs, NULL);
dengyihao's avatar
dengyihao 已提交
740

dengyihao's avatar
dengyihao 已提交
741
  memset(&pConn->regArg, 0, sizeof(pConn->regArg));
dengyihao's avatar
dengyihao 已提交
742
  pConn->broken = false;
dengyihao's avatar
dengyihao 已提交
743
  pConn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
744

dengyihao's avatar
dengyihao 已提交
745 746 747 748 749 750 751
  SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
  exh->handle = pConn;
  exh->pThrd = pThrd;
  exh->refId = uvAddExHandle(exh);
  uvAcquireExHandle(exh->refId);

  pConn->refId = exh->refId;
dengyihao's avatar
dengyihao 已提交
752
  transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
753
  tTrace("server handle %p, conn %p created, refId: %" PRId64 "", exh, pConn, pConn->refId);
dengyihao's avatar
dengyihao 已提交
754 755
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
756

dengyihao's avatar
dengyihao 已提交
757
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
758 759 760
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
761

dengyihao's avatar
dengyihao 已提交
762
  transDestroyBuffer(&conn->readBuf);
763
  if (clear) {
dengyihao's avatar
dengyihao 已提交
764
    tTrace("server conn %p to be destroyed", conn);
wafwerar's avatar
wafwerar 已提交
765
    uv_shutdown_t* req = taosMemoryMalloc(sizeof(uv_shutdown_t));
dengyihao's avatar
dengyihao 已提交
766
    uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
767
  }
dengyihao's avatar
dengyihao 已提交
768 769
}
static void uvDestroyConn(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
770 771 772 773
  SSrvConn* conn = handle->data;
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
774 775
  SWorkThrdObj* thrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
776 777 778
  uvReleaseExHandle(conn->refId);
  uvRemoveExHandle(conn->refId);

dengyihao's avatar
dengyihao 已提交
779
  tDebug("server conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
780
  // uv_timer_stop(&conn->pTimer);
dengyihao's avatar
dengyihao 已提交
781
  transQueueDestroy(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
782
  QUEUE_REMOVE(&conn->queue);
wafwerar's avatar
wafwerar 已提交
783
  taosMemoryFree(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
784
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
785

dengyihao's avatar
dengyihao 已提交
786
  if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
787
    tTrace("work thread quit");
dengyihao's avatar
dengyihao 已提交
788
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
789
  }
dengyihao's avatar
dengyihao 已提交
790 791
}

U
ubuntu 已提交
792
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
793 794
  SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
795 796
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
wafwerar's avatar
wafwerar 已提交
797 798
  srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
dengyihao's avatar
dengyihao 已提交
799 800 801 802
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

dengyihao's avatar
dengyihao 已提交
803
  taosThreadOnce(&transModuleInit, uvInitExHandleMgt);
dengyihao's avatar
dengyihao 已提交
804
  transSrvInst++;
dengyihao's avatar
dengyihao 已提交
805
  // uvOpenExHandleMgt(10000);
dengyihao's avatar
dengyihao 已提交
806

dengyihao's avatar
dengyihao 已提交
807
  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
808
    SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
809
    thrd->quit = false;
dengyihao's avatar
dengyihao 已提交
810
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
811

wafwerar's avatar
wafwerar 已提交
812
    srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
dengyihao's avatar
dengyihao 已提交
813 814
    int fds[2];
    if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
dengyihao's avatar
dengyihao 已提交
815
      goto End;
dengyihao's avatar
dengyihao 已提交
816 817 818 819
    }
    uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

dengyihao's avatar
dengyihao 已提交
820
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
821 822
    thrd->fd = fds[0];
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
823

dengyihao's avatar
dengyihao 已提交
824 825 826
    if (false == addHandleToWorkloop(thrd)) {
      goto End;
    }
827
    int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
dengyihao's avatar
dengyihao 已提交
828 829 830 831 832 833 834 835
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
    }
  }
dengyihao's avatar
dengyihao 已提交
836 837 838
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
839
  int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
dengyihao's avatar
dengyihao 已提交
840 841 842 843 844 845 846
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
    // clear all resource later
  }

  return srv;
dengyihao's avatar
dengyihao 已提交
847
End:
U
ubuntu 已提交
848
  transCloseServer(srv);
dengyihao's avatar
dengyihao 已提交
849 850
  return NULL;
}
dengyihao's avatar
dengyihao 已提交
851 852 853 854 855

void uvInitExHandleMgt() {
  // init exhandle mgt
  uvOpenExHandleMgt(10000);
}
dengyihao's avatar
dengyihao 已提交
856 857 858 859
void uvOpenExHandleMgt(int size) {
  // added into once later
  exHandlesMgt = taosOpenRef(size, uvDestoryExHandle);
}
dengyihao's avatar
dengyihao 已提交
860 861 862 863
void uvCloseExHandleMgt() {
  // close ref
  taosCloseRef(exHandlesMgt);
}
dengyihao's avatar
dengyihao 已提交
864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888
int64_t uvAddExHandle(void* p) {
  // acquire extern handle
  return taosAddRef(exHandlesMgt, p);
}
int32_t uvRemoveExHandle(int64_t refId) {
  // acquire extern handle
  return taosRemoveRef(exHandlesMgt, refId);
}

SExHandle* uvAcquireExHandle(int64_t refId) {
  // acquire extern handle
  return (SExHandle*)taosAcquireRef(exHandlesMgt, refId);
}

int32_t uvReleaseExHandle(int64_t refId) {
  // release extern handle
  return taosReleaseRef(exHandlesMgt, refId);
}
void uvDestoryExHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  taosMemoryFree(handle);
}

dengyihao's avatar
dengyihao 已提交
889
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
890
  thrd->quit = true;
dengyihao's avatar
dengyihao 已提交
891
  if (QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
892
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
893 894 895
  } else {
    destroyAllConn(thrd);
  }
wafwerar's avatar
wafwerar 已提交
896
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
897 898 899 900
}
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
901
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
902
      return;
dengyihao's avatar
dengyihao 已提交
903 904 905
    }
    uvStartSendRespInternal(msg);
    return;
dengyihao's avatar
dengyihao 已提交
906 907
  } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
    tDebug("server conn %p already released, ignore release-msg", conn);
dengyihao's avatar
dengyihao 已提交
908
  }
dengyihao's avatar
dengyihao 已提交
909
  destroySmsg(msg);
dengyihao's avatar
dengyihao 已提交
910
}
dengyihao's avatar
dengyihao 已提交
911
void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
912
  // send msg to client
dengyihao's avatar
dengyihao 已提交
913
  tDebug("server conn %p start to send resp (2/2)", msg->pConn);
dengyihao's avatar
dengyihao 已提交
914 915
  uvStartSendResp(msg);
}
dengyihao's avatar
dengyihao 已提交
916 917
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
dengyihao's avatar
dengyihao 已提交
918
  tDebug("server conn %p register brokenlink callback", conn);
dengyihao's avatar
dengyihao 已提交
919
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
920
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
921 922
      return;
    }
dengyihao's avatar
dengyihao 已提交
923
    transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
924 925 926
    conn->regArg.notifyCount = 0;
    conn->regArg.init = 1;
    conn->regArg.msg = msg->msg;
dengyihao's avatar
dengyihao 已提交
927
    tDebug("server conn %p register brokenlink callback succ", conn);
dengyihao's avatar
dengyihao 已提交
928 929 930 931 932 933

    if (conn->broken) {
      STrans* pTransInst = conn->pTransInst;
      (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
      memset(&conn->regArg, 0, sizeof(conn->regArg));
    }
wafwerar's avatar
wafwerar 已提交
934
    taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
935 936
  }
}
dengyihao's avatar
dengyihao 已提交
937 938 939 940
void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
941
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
942
  SRV_RELEASE_UV(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
943
  transDestroyAsyncPool(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
944
  taosMemoryFree(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
945
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
946
}
dengyihao's avatar
dengyihao 已提交
947
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
wafwerar's avatar
wafwerar 已提交
948
  SSrvMsg* msg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
949
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
950
  tDebug("server send quit msg to work thread");
dengyihao's avatar
dengyihao 已提交
951
  transSendAsync(pThrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
952 953
}

U
ubuntu 已提交
954
void transCloseServer(void* arg) {
dengyihao's avatar
dengyihao 已提交
955 956
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
957 958 959

  tDebug("send quit msg to accept thread");
  uv_async_send(srv->pAcceptAsync);
wafwerar's avatar
wafwerar 已提交
960
  taosThreadJoin(srv->thread, NULL);
dengyihao's avatar
dengyihao 已提交
961

dengyihao's avatar
dengyihao 已提交
962
  SRV_RELEASE_UV(srv->loop);
dengyihao's avatar
dengyihao 已提交
963

dengyihao's avatar
dengyihao 已提交
964 965 966 967 968
  for (int i = 0; i < srv->numOfThreads; i++) {
    sendQuitToWorkThrd(srv->pThreadObj[i]);
    destroyWorkThrd(srv->pThreadObj[i]);
  }

wafwerar's avatar
wafwerar 已提交
969 970 971
  taosMemoryFree(srv->pThreadObj);
  taosMemoryFree(srv->pAcceptAsync);
  taosMemoryFree(srv->loop);
dengyihao's avatar
dengyihao 已提交
972 973

  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
974
    taosMemoryFree(srv->pipe[i]);
dengyihao's avatar
dengyihao 已提交
975
  }
wafwerar's avatar
wafwerar 已提交
976
  taosMemoryFree(srv->pipe);
dengyihao's avatar
dengyihao 已提交
977

wafwerar's avatar
wafwerar 已提交
978
  taosMemoryFree(srv);
dengyihao's avatar
dengyihao 已提交
979

dengyihao's avatar
dengyihao 已提交
980 981
  transSrvInst--;
  if (transSrvInst == 0) {
dengyihao's avatar
dengyihao 已提交
982
    transModuleInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
983 984
    uvCloseExHandleMgt();
  }
dengyihao's avatar
dengyihao 已提交
985
}
dengyihao's avatar
dengyihao 已提交
986

dengyihao's avatar
dengyihao 已提交
987 988 989 990 991
void transRefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
992
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
993 994 995 996 997 998 999
}

void transUnrefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
1000
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
1001 1002 1003 1004
  if (ref == 0) {
    destroyConn((SSrvConn*)handle, true);
  }
}
dengyihao's avatar
dengyihao 已提交
1005 1006

void transReleaseSrvHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
1007
  SExHandle* exh = handle;
dengyihao's avatar
dengyihao 已提交
1008 1009
  int64_t    refId = exh->refId;

dengyihao's avatar
dengyihao 已提交
1010
  ASYNC_CHECK_HANDLE(exh, refId);
dengyihao's avatar
dengyihao 已提交
1011

dengyihao's avatar
dengyihao 已提交
1012 1013 1014
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);

dengyihao's avatar
dengyihao 已提交
1015
  STransMsg tmsg = {.code = 0, .handle = exh, .ahandle = NULL, .refId = refId};
dengyihao's avatar
dengyihao 已提交
1016

wafwerar's avatar
wafwerar 已提交
1017
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1018 1019 1020
  srvMsg->msg = tmsg;
  srvMsg->type = Release;

dengyihao's avatar
dengyihao 已提交
1021
  tTrace("server conn %p start to release", exh->handle);
dengyihao's avatar
dengyihao 已提交
1022
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
1023 1024
  uvReleaseExHandle(refId);
  return;
dengyihao's avatar
dengyihao 已提交
1025
_return1:
dengyihao's avatar
dengyihao 已提交
1026
  tTrace("server handle %p failed to send to release handle", exh);
dengyihao's avatar
dengyihao 已提交
1027
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
1028 1029
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1030
  tTrace("server handle %p failed to send to release handle", exh);
dengyihao's avatar
dengyihao 已提交
1031
  return;
dengyihao's avatar
dengyihao 已提交
1032
}
dengyihao's avatar
dengyihao 已提交
1033 1034 1035 1036
void transSendResponse(const STransMsg* msg) {
  SExHandle* exh = msg->handle;
  int64_t    refId = msg->refId;
  ASYNC_CHECK_HANDLE(exh, refId);
dengyihao's avatar
dengyihao 已提交
1037
  assert(refId != 0);
dengyihao's avatar
dengyihao 已提交
1038

dengyihao's avatar
dengyihao 已提交
1039 1040 1041
  STransMsg tmsg = *msg;
  tmsg.refId = refId;

dengyihao's avatar
dengyihao 已提交
1042 1043
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);
dengyihao's avatar
dengyihao 已提交
1044

wafwerar's avatar
wafwerar 已提交
1045
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1046
  srvMsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
1047
  srvMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
1048
  tTrace("server conn %p start to send resp (1/2)", exh->handle);
dengyihao's avatar
dengyihao 已提交
1049
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
1050 1051
  uvReleaseExHandle(refId);
  return;
dengyihao's avatar
dengyihao 已提交
1052
_return1:
dengyihao's avatar
dengyihao 已提交
1053
  tTrace("server handle %p failed to send resp", exh);
dengyihao's avatar
dengyihao 已提交
1054
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1055
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
1056 1057
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1058
  tTrace("server handle %p failed to send resp", exh);
dengyihao's avatar
dengyihao 已提交
1059 1060
  rpcFreeCont(msg->pCont);
  return;
dengyihao's avatar
dengyihao 已提交
1061
}
dengyihao's avatar
dengyihao 已提交
1062
void transRegisterMsg(const STransMsg* msg) {
dengyihao's avatar
dengyihao 已提交
1063
  SExHandle* exh = msg->handle;
dengyihao's avatar
dengyihao 已提交
1064 1065 1066
  int64_t    refId = msg->refId;
  ASYNC_CHECK_HANDLE(exh, refId);

dengyihao's avatar
dengyihao 已提交
1067 1068 1069
  STransMsg tmsg = *msg;
  tmsg.refId = refId;

dengyihao's avatar
dengyihao 已提交
1070 1071
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);
dengyihao's avatar
dengyihao 已提交
1072

wafwerar's avatar
wafwerar 已提交
1073
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1074
  srvMsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
1075
  srvMsg->type = Register;
dengyihao's avatar
dengyihao 已提交
1076
  tTrace("server conn %p start to register brokenlink callback", exh->handle);
dengyihao's avatar
dengyihao 已提交
1077
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
1078 1079
  uvReleaseExHandle(refId);
  return;
dengyihao's avatar
dengyihao 已提交
1080
_return1:
dengyihao's avatar
dengyihao 已提交
1081
  tTrace("server handle %p failed to send to register brokenlink", exh);
dengyihao's avatar
dengyihao 已提交
1082
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1083
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
1084 1085
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1086
  tTrace("server handle %p failed to send to register brokenlink", exh);
dengyihao's avatar
dengyihao 已提交
1087
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1088
}
dengyihao's avatar
dengyihao 已提交
1089

dengyihao's avatar
formate  
dengyihao 已提交
1090
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
dengyihao's avatar
dengyihao 已提交
1091 1092 1093 1094 1095 1096
  if (thandle == NULL) {
    tTrace("invalid handle %p, failed to Get Conn info", thandle);
    return -1;
  }
  SExHandle* ex = thandle;
  SSrvConn*  pConn = ex->handle;
U
ubuntu 已提交
1097

dengyihao's avatar
dengyihao 已提交
1098
  struct sockaddr_in addr = pConn->addr;
dengyihao's avatar
dengyihao 已提交
1099 1100
  pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(addr.sin_port);
dengyihao's avatar
dengyihao 已提交
1101 1102 1103 1104
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
1105
#endif