transSvr.c 38.6 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20 21
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;

dengyihao's avatar
dengyihao 已提交
22
static char*   notify = "a";
dengyihao's avatar
dengyihao 已提交
23
static int32_t tranSSvrInst = 0;
dengyihao's avatar
dengyihao 已提交
24
static int32_t refMgt = 0;
dengyihao's avatar
dengyihao 已提交
25

dengyihao's avatar
dengyihao 已提交
26 27 28 29
typedef struct {
  int       notifyCount;  //
  int       init;         // init or not
  STransMsg msg;
dengyihao's avatar
dengyihao 已提交
30
} SSvrRegArg;
dengyihao's avatar
dengyihao 已提交
31

dengyihao's avatar
dengyihao 已提交
32
typedef struct SSvrConn {
dengyihao's avatar
dengyihao 已提交
33
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
34 35 36
  uv_tcp_t*  pTcp;
  uv_write_t pWriter;
  uv_timer_t pTimer;
dengyihao's avatar
dengyihao 已提交
37 38

  queue       queue;
dengyihao's avatar
dengyihao 已提交
39
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
40 41 42
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
43
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
44
  STransQueue srvMsgs;
dengyihao's avatar
dengyihao 已提交
45

dengyihao's avatar
dengyihao 已提交
46
  SSvrRegArg regArg;
dengyihao's avatar
dengyihao 已提交
47
  bool       broken;  // conn broken;
dengyihao's avatar
dengyihao 已提交
48

dengyihao's avatar
dengyihao 已提交
49
  ConnStatus         status;
dengyihao's avatar
dengyihao 已提交
50
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
51
  struct sockaddr_in localAddr;
U
ubuntu 已提交
52

dengyihao's avatar
dengyihao 已提交
53 54 55 56 57 58
  int64_t refId;
  int     spi;
  char    info[64];
  char    user[TSDB_UNI_LEN];  // user ID for the link
  char    secret[TSDB_PASSWORD_LEN];
  char    ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
59
} SSvrConn;
dengyihao's avatar
dengyihao 已提交
60

dengyihao's avatar
dengyihao 已提交
61 62
typedef struct SSvrMsg {
  SSvrConn*     pConn;
dengyihao's avatar
dengyihao 已提交
63 64 65
  STransMsg     msg;
  queue         q;
  STransMsgType type;
dengyihao's avatar
dengyihao 已提交
66
} SSvrMsg;
dengyihao's avatar
dengyihao 已提交
67 68

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
69
  TdThread      thread;
wafwerar's avatar
wafwerar 已提交
70
  uv_connect_t  connect_req;
dengyihao's avatar
dengyihao 已提交
71 72 73 74 75
  uv_pipe_t*    pipe;
  uv_os_fd_t    fd;
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  queue         msg;
wafwerar's avatar
wafwerar 已提交
76
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
77 78 79 80

  queue conn;
  void* pTransInst;
  bool  quit;
dengyihao's avatar
dengyihao 已提交
81 82 83
} SWorkThrdObj;

typedef struct SServerObj {
dengyihao's avatar
dengyihao 已提交
84
  TdThread   thread;
dengyihao's avatar
dengyihao 已提交
85 86 87 88
  uv_tcp_t   server;
  uv_loop_t* loop;

  // work thread info
dengyihao's avatar
dengyihao 已提交
89 90
  int            workerIdx;
  int            numOfThreads;
wafwerar's avatar
wafwerar 已提交
91
  int            numOfWorkerReady;
dengyihao's avatar
dengyihao 已提交
92
  SWorkThrdObj** pThreadObj;
dengyihao's avatar
dengyihao 已提交
93

wafwerar's avatar
wafwerar 已提交
94
  uv_pipe_t   pipeListen;
dengyihao's avatar
dengyihao 已提交
95 96 97 98
  uv_pipe_t** pipe;
  uint32_t    ip;
  uint32_t    port;
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
99 100

  bool inited;
dengyihao's avatar
dengyihao 已提交
101 102 103
} SServerObj;

static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
104
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
105
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
106
static void uvOnTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
107
static void uvOnSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
108
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
109 110 111
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
112
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
113
static void uvShutDownCb(uv_shutdown_t* req, int status);
dengyihao's avatar
dengyihao 已提交
114 115 116 117 118 119 120

/*
 * time-consuming task throwed into BG work thread
 */
static void uvWorkDoTask(uv_work_t* req);
static void uvWorkAfterTask(uv_work_t* req, int status);

dengyihao's avatar
dengyihao 已提交
121 122
static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
123

dengyihao's avatar
dengyihao 已提交
124 125 126
static void uvStartSendRespInternal(SSvrMsg* smsg);
static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSvrMsg* msg);
dengyihao's avatar
dengyihao 已提交
127

dengyihao's avatar
dengyihao 已提交
128
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
dengyihao's avatar
dengyihao 已提交
129

dengyihao's avatar
dengyihao 已提交
130
static void destroySmsg(SSvrMsg* smsg);
131
// check whether already read complete packet
dengyihao's avatar
dengyihao 已提交
132 133 134
static SSvrConn* createConn(void* hThrd);
static void      destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
static void      destroyConnRegArg(SSvrConn* conn);
dengyihao's avatar
dengyihao 已提交
135

dengyihao's avatar
dengyihao 已提交
136
static int reallocConnRefHandle(SSvrConn* conn);
dengyihao's avatar
dengyihao 已提交
137

dengyihao's avatar
dengyihao 已提交
138 139 140 141 142
static void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd);
static void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd);
static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
dengyihao's avatar
dengyihao 已提交
143
                                                                       uvHandleRegister, NULL};
dengyihao's avatar
dengyihao 已提交
144

dengyihao's avatar
dengyihao 已提交
145
static int32_t exHandlesMgt;
dengyihao's avatar
dengyihao 已提交
146

dengyihao's avatar
dengyihao 已提交
147 148 149 150 151 152 153 154
// void       uvInitEnv();
// void       uvOpenExHandleMgt(int size);
// void       uvCloseExHandleMgt();
// int64_t    uvAddExHandle(void* p);
// int32_t    uvRemoveExHandle(int64_t refId);
// int32_t    uvReleaseExHandle(int64_t refId);
// void       uvDestoryExHandle(void* handle);
// SExHandle* uvAcquireExHandle(int64_t refId);
dengyihao's avatar
dengyihao 已提交
155

156
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
157

dengyihao's avatar
dengyihao 已提交
158
// server and worker thread
159 160
static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
dengyihao's avatar
dengyihao 已提交
161

dengyihao's avatar
dengyihao 已提交
162
// add handle loop
dengyihao's avatar
dengyihao 已提交
163
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName);
dengyihao's avatar
dengyihao 已提交
164 165
static bool addHandleToAcceptloop(void* arg);

S
Shengliang Guan 已提交
166 167 168 169 170 171
#define CONN_SHOULD_RELEASE(conn, head)                                               \
  do {                                                                                \
    if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) {                    \
      conn->status = ConnRelease;                                                     \
      transClearBuffer(&conn->readBuf);                                               \
      transFreeMsg(transContFromHead((char*)head));                                   \
dengyihao's avatar
dengyihao 已提交
172
      tTrace("conn %p received release request", conn);                               \
S
Shengliang Guan 已提交
173 174
                                                                                      \
      STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \
dengyihao's avatar
dengyihao 已提交
175
      SSvrMsg*  srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg));                        \
S
Shengliang Guan 已提交
176 177 178 179 180 181 182
      srvMsg->msg = tmsg;                                                             \
      srvMsg->type = Release;                                                         \
      srvMsg->pConn = conn;                                                           \
      reallocConnRefHandle(conn);                                                     \
      if (!transQueuePush(&conn->srvMsgs, srvMsg)) {                                  \
        return;                                                                       \
      }                                                                               \
dengyihao's avatar
dengyihao 已提交
183
      if (conn->regArg.init) {                                                        \
dengyihao's avatar
dengyihao 已提交
184
        tTrace("conn %p release, notify server app", conn);                           \
dengyihao's avatar
dengyihao 已提交
185 186 187 188
        STrans* pTransInst = conn->pTransInst;                                        \
        (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);            \
        memset(&conn->regArg, 0, sizeof(conn->regArg));                               \
      }                                                                               \
S
Shengliang Guan 已提交
189 190 191
      uvStartSendRespInternal(srvMsg);                                                \
      return;                                                                         \
    }                                                                                 \
dengyihao's avatar
dengyihao 已提交
192 193 194 195 196 197 198 199 200
  } while (0)

#define SRV_RELEASE_UV(loop)       \
  do {                             \
    uv_walk(loop, uvWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);  \
    uv_loop_close(loop);           \
  } while (0);

dengyihao's avatar
dengyihao 已提交
201 202 203 204
#define ASYNC_ERR_JRET(thrd)                            \
  do {                                                  \
    if (thrd->quit) {                                   \
      tTrace("worker thread already quit, ignore msg"); \
dengyihao's avatar
dengyihao 已提交
205
      goto _return1;                                    \
dengyihao's avatar
dengyihao 已提交
206 207 208
    }                                                   \
  } while (0)

dengyihao's avatar
dengyihao 已提交
209 210 211
#define ASYNC_CHECK_HANDLE(exh1, refId)                                                                               \
  do {                                                                                                                \
    if (refId > 0) {                                                                                                  \
dengyihao's avatar
dengyihao 已提交
212
      tTrace("handle step1");                                                                                         \
dengyihao's avatar
dengyihao 已提交
213
      SExHandle* exh2 = transAcquireExHandle(refMgt, refId);                                                          \
dengyihao's avatar
dengyihao 已提交
214
      if (exh2 == NULL || refId != exh2->refId) {                                                                     \
dengyihao's avatar
dengyihao 已提交
215
        tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1,        \
dengyihao's avatar
dengyihao 已提交
216
               exh2 ? exh2->refId : 0, refId);                                                                        \
dengyihao's avatar
dengyihao 已提交
217 218 219
        goto _return1;                                                                                                \
      }                                                                                                               \
    } else if (refId == 0) {                                                                                          \
dengyihao's avatar
dengyihao 已提交
220
      tTrace("handle step2");                                                                                         \
dengyihao's avatar
dengyihao 已提交
221
      SExHandle* exh2 = transAcquireExHandle(refMgt, refId);                                                          \
dengyihao's avatar
dengyihao 已提交
222
      if (exh2 == NULL || refId != exh2->refId) {                                                                     \
dengyihao's avatar
dengyihao 已提交
223 224
        tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, refId, \
               exh2 ? exh2->refId : 0);                                                                               \
dengyihao's avatar
dengyihao 已提交
225 226 227 228
        goto _return1;                                                                                                \
      } else {                                                                                                        \
        refId = exh1->refId;                                                                                          \
      }                                                                                                               \
dengyihao's avatar
dengyihao 已提交
229
    } else if (refId < 0) {                                                                                           \
dengyihao's avatar
dengyihao 已提交
230
      tTrace("handle step3");                                                                                         \
dengyihao's avatar
dengyihao 已提交
231 232
      goto _return2;                                                                                                  \
    }                                                                                                                 \
dengyihao's avatar
dengyihao 已提交
233 234
  } while (0)

dengyihao's avatar
dengyihao 已提交
235
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
236
  SSvrConn*    conn = handle->data;
dengyihao's avatar
dengyihao 已提交
237 238
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
239 240 241 242
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
243
  SSvrConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
244
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
245 246
}

dengyihao's avatar
dengyihao 已提交
247
static void uvHandleReq(SSvrConn* pConn) {
dengyihao's avatar
dengyihao 已提交
248
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
249 250 251 252
  char*        msg = pBuf->buf;
  uint32_t     msgLen = pBuf->len;

  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
253
  pHead->code = htonl(pHead->code);
dengyihao's avatar
dengyihao 已提交
254
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
255
  memcpy(pConn->user, pHead->user, strlen(pHead->user));
dengyihao's avatar
dengyihao 已提交
256

dengyihao's avatar
dengyihao 已提交
257 258 259 260 261 262 263
  // TODO(dengyihao): time-consuming task throwed into BG Thread
  //  uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
  //  wreq->data = pConn;
  //  uv_read_stop((uv_stream_t*)pConn->pTcp);
  //  transRefSrvHandle(pConn);
  //  uv_queue_work(((SWorkThrdObj*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);

dengyihao's avatar
dengyihao 已提交
264
  CONN_SHOULD_RELEASE(pConn, pHead);
dengyihao's avatar
dengyihao 已提交
265

dengyihao's avatar
dengyihao 已提交
266
  STransMsg transMsg;
dengyihao's avatar
dengyihao 已提交
267
  memset(&transMsg, 0, sizeof(transMsg));
dengyihao's avatar
dengyihao 已提交
268 269 270 271
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = pHead->content;
  transMsg.msgType = pHead->msgType;
  transMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
272

dengyihao's avatar
dengyihao 已提交
273
  transClearBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
274

dengyihao's avatar
dengyihao 已提交
275
  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
276 277 278 279
  if (pConn->status == ConnNormal) {
    if (pHead->persist == 1) {
      pConn->status = ConnAcquire;
      transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
280
      tDebug("conn %p acquired by server app", pConn);
dengyihao's avatar
dengyihao 已提交
281 282
    }
  }
dengyihao's avatar
dengyihao 已提交
283
  STraceId* trace = &pHead->traceId;
dengyihao's avatar
dengyihao 已提交
284
  if (pConn->status == ConnNormal && pHead->noResp == 0) {
dengyihao's avatar
dengyihao 已提交
285
    transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
286

dengyihao's avatar
dengyihao 已提交
287
    tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", transLabel(pConn), pConn,
dengyihao's avatar
dengyihao 已提交
288
            TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
dengyihao's avatar
dengyihao 已提交
289 290 291 292
            taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen);
  } else {
    tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", transLabel(pConn),
            pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
dengyihao's avatar
dengyihao 已提交
293 294
            taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp,
            transMsg.code);
dengyihao's avatar
dengyihao 已提交
295 296 297
    // no ref here
  }

dengyihao's avatar
dengyihao 已提交
298
  // pHead->noResp = 1,
dengyihao's avatar
dengyihao 已提交
299 300 301
  // 1. server application should not send resp on handle
  // 2. once send out data, cli conn released to conn pool immediately
  // 3. not mixed with persist
dengyihao's avatar
dengyihao 已提交
302
  transMsg.info.ahandle = (void*)pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
303
  transMsg.info.handle = (void*)transAcquireExHandle(refMgt, pConn->refId);
S
Shengliang Guan 已提交
304
  transMsg.info.refId = pConn->refId;
dengyihao's avatar
dengyihao 已提交
305 306
  transMsg.info.traceId = pHead->traceId;

dengyihao's avatar
dengyihao 已提交
307 308
  tGTrace("%s handle %p conn: %p translated to app, refId: %" PRIu64 "", transLabel(pConn), transMsg.info.handle, pConn,
          pConn->refId);
S
Shengliang Guan 已提交
309
  assert(transMsg.info.handle != NULL);
dengyihao's avatar
dengyihao 已提交
310

dengyihao's avatar
dengyihao 已提交
311
  if (pHead->noResp == 1) {
S
Shengliang Guan 已提交
312
    transMsg.info.refId = -1;
dengyihao's avatar
dengyihao 已提交
313
  }
dengyihao's avatar
dengyihao 已提交
314 315

  // set up conn info
316
  SRpcConnInfo* pConnInfo = &(transMsg.info.conn);
dengyihao's avatar
dengyihao 已提交
317 318 319 320
  pConnInfo->clientIp = (uint32_t)(pConn->addr.sin_addr.s_addr);
  pConnInfo->clientPort = ntohs(pConn->addr.sin_port);
  tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));

dengyihao's avatar
dengyihao 已提交
321
  transReleaseExHandle(refMgt, pConn->refId);
dengyihao's avatar
dengyihao 已提交
322

dengyihao's avatar
dengyihao 已提交
323
  STrans* pTransInst = pConn->pTransInst;
dengyihao's avatar
dengyihao 已提交
324
  (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
dengyihao's avatar
dengyihao 已提交
325
  // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
326 327
}

dengyihao's avatar
dengyihao 已提交
328
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
329
  // opt
dengyihao's avatar
dengyihao 已提交
330
  SSvrConn*    conn = cli->data;
dengyihao's avatar
dengyihao 已提交
331
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
332 333
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
334
    tTrace("%s conn %p total read: %d, current read: %d", transLabel(conn->pTransInst), conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
335
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
336
      tTrace("%s conn %p alread read complete packet", transLabel(conn->pTransInst), conn);
dengyihao's avatar
dengyihao 已提交
337
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
338
    } else {
dengyihao's avatar
dengyihao 已提交
339
      tTrace("%s conn %p read partial packet, continue to read", transLabel(conn->pTransInst), conn);
dengyihao's avatar
dengyihao 已提交
340 341 342
    }
    return;
  }
343 344 345
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
346

dengyihao's avatar
dengyihao 已提交
347
  tError("%s conn %p read error: %s", transLabel(conn->pTransInst), conn, uv_err_name(nread));
U
ubuntu 已提交
348
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
349
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
350 351
    if (conn->status == ConnAcquire) {
      if (conn->regArg.init) {
dengyihao's avatar
dengyihao 已提交
352
        tTrace("%s conn %p broken, notify server app", transLabel(conn->pTransInst), conn);
dengyihao's avatar
dengyihao 已提交
353 354 355 356 357
        STrans* pTransInst = conn->pTransInst;
        (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
        memset(&conn->regArg, 0, sizeof(conn->regArg));
      }
    }
dengyihao's avatar
dengyihao 已提交
358
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
359 360 361 362
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->len = 2;
wafwerar's avatar
wafwerar 已提交
363
  buf->base = taosMemoryCalloc(1, sizeof(char) * buf->len);
dengyihao's avatar
dengyihao 已提交
364 365 366 367
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
368
  SSvrConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
369
  tError("conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
370 371
}

dengyihao's avatar
dengyihao 已提交
372
void uvOnSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
373
  SSvrConn* conn = req->data;
dengyihao's avatar
dengyihao 已提交
374
  // transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
375
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
376
    tTrace("conn %p data already was written on stream", conn);
dengyihao's avatar
dengyihao 已提交
377
    if (!transQueueEmpty(&conn->srvMsgs)) {
dengyihao's avatar
dengyihao 已提交
378
      SSvrMsg* msg = transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
379 380 381 382 383 384 385 386
      // if (msg->type == Release && conn->status != ConnNormal) {
      //  conn->status = ConnNormal;
      //  transUnrefSrvHandle(conn);
      //  reallocConnRefHandle(conn);
      //  destroySmsg(msg);
      //  transQueueClear(&conn->srvMsgs);
      //  return;
      //}
dengyihao's avatar
add UT  
dengyihao 已提交
387 388
      destroySmsg(msg);
      // send second data, just use for push
dengyihao's avatar
dengyihao 已提交
389
      if (!transQueueEmpty(&conn->srvMsgs)) {
dengyihao's avatar
dengyihao 已提交
390
        msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
391 392 393 394 395 396 397 398 399
        if (msg->type == Register && conn->status == ConnAcquire) {
          conn->regArg.notifyCount = 0;
          conn->regArg.init = 1;
          conn->regArg.msg = msg->msg;
          if (conn->broken) {
            STrans* pTransInst = conn->pTransInst;
            (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
            memset(&conn->regArg, 0, sizeof(conn->regArg));
          }
dengyihao's avatar
dengyihao 已提交
400
          transQueuePop(&conn->srvMsgs);
wafwerar's avatar
wafwerar 已提交
401
          taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
402

dengyihao's avatar
dengyihao 已提交
403
          msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
404 405 406
          if (msg != NULL) {
            uvStartSendRespInternal(msg);
          }
dengyihao's avatar
dengyihao 已提交
407 408 409
        } else {
          uvStartSendRespInternal(msg);
        }
dengyihao's avatar
add UT  
dengyihao 已提交
410
      }
dengyihao's avatar
dengyihao 已提交
411
    }
dengyihao's avatar
dengyihao 已提交
412
  } else {
dengyihao's avatar
dengyihao 已提交
413
    tError("conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
414
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
415
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
416 417
  }
}
dengyihao's avatar
dengyihao 已提交
418 419
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
420
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
421 422 423
  } else {
    tError("fail to dispatch conn to work thread");
  }
dengyihao's avatar
dengyihao 已提交
424 425
  uv_close((uv_handle_t*)req->data, uvFreeCb);
  // taosMemoryFree(req->data);
wafwerar's avatar
wafwerar 已提交
426
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
427
}
dengyihao's avatar
dengyihao 已提交
428

dengyihao's avatar
dengyihao 已提交
429 430
static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
  SSvrConn*  pConn = smsg->pConn;
dengyihao's avatar
formate  
dengyihao 已提交
431
  STransMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
432 433 434 435 436
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
S
Shengliang Guan 已提交
437
  pHead->ahandle = (uint64_t)pMsg->info.ahandle;
dengyihao's avatar
dengyihao 已提交
438
  pHead->traceId = pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
439

dengyihao's avatar
dengyihao 已提交
440 441 442
  if (pConn->status == ConnNormal) {
    pHead->msgType = pConn->inType + 1;
  } else {
dengyihao's avatar
dengyihao 已提交
443 444 445
    if (smsg->type == Release) {
      pHead->msgType = 0;
      pConn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
446 447

      destroyConnRegArg(pConn);
dengyihao's avatar
dengyihao 已提交
448 449 450 451
      transUnrefSrvHandle(pConn);
    } else {
      pHead->msgType = pMsg->msgType;
    }
dengyihao's avatar
dengyihao 已提交
452
  }
dengyihao's avatar
dengyihao 已提交
453

dengyihao's avatar
dengyihao 已提交
454
  pHead->release = smsg->type == Release ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
455
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
456

dengyihao's avatar
dengyihao 已提交
457 458
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
459 460

  STraceId* trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
461 462 463
  tGTrace("%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", transLabel(pConn->pTransInst), pConn,
          TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
          taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), len);
dengyihao's avatar
dengyihao 已提交
464
  pHead->msgLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
465

dengyihao's avatar
dengyihao 已提交
466 467 468
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
469

dengyihao's avatar
dengyihao 已提交
470
static void uvStartSendRespInternal(SSvrMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
471 472 473
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

dengyihao's avatar
dengyihao 已提交
474
  SSvrConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
475
  // uv_timer_stop(&pConn->pTimer);
dengyihao's avatar
dengyihao 已提交
476
  uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
dengyihao's avatar
dengyihao 已提交
477
}
dengyihao's avatar
dengyihao 已提交
478
static void uvStartSendResp(SSvrMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
479
  // impl
dengyihao's avatar
dengyihao 已提交
480
  SSvrConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
481 482

  if (pConn->broken == true) {
dengyihao's avatar
dengyihao 已提交
483
    // persist by
dengyihao's avatar
dengyihao 已提交
484 485
    transFreeMsg(smsg->msg.pCont);
    taosMemoryFree(smsg);
dengyihao's avatar
dengyihao 已提交
486 487 488
    transUnrefSrvHandle(pConn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
489 490 491
  if (pConn->status == ConnNormal) {
    transUnrefSrvHandle(pConn);
  }
dengyihao's avatar
dengyihao 已提交
492

dengyihao's avatar
dengyihao 已提交
493
  if (!transQueuePush(&pConn->srvMsgs, smsg)) {
dengyihao's avatar
dengyihao 已提交
494 495 496
    return;
  }
  uvStartSendRespInternal(smsg);
dengyihao's avatar
dengyihao 已提交
497 498
  return;
}
dengyihao's avatar
dengyihao 已提交
499

dengyihao's avatar
dengyihao 已提交
500
static void destroySmsg(SSvrMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
501 502 503 504
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
wafwerar's avatar
wafwerar 已提交
505
  taosMemoryFree(smsg);
dengyihao's avatar
dengyihao 已提交
506
}
dengyihao's avatar
fix bug  
dengyihao 已提交
507
static void destroyAllConn(SWorkThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
508
  tTrace("thread %p destroy all conn ", pThrd);
dengyihao's avatar
fix bug  
dengyihao 已提交
509 510 511 512 513
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
    queue* h = QUEUE_HEAD(&pThrd->conn);
    QUEUE_REMOVE(h);
    QUEUE_INIT(h);

dengyihao's avatar
dengyihao 已提交
514
    SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue);
dengyihao's avatar
dengyihao 已提交
515 516 517
    while (T_REF_VAL_GET(c) >= 2) {
      transUnrefSrvHandle(c);
    }
dengyihao's avatar
dengyihao 已提交
518
    transUnrefSrvHandle(c);
dengyihao's avatar
fix bug  
dengyihao 已提交
519 520
  }
}
dengyihao's avatar
dengyihao 已提交
521
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
522 523
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
524
  SSvrConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
525
  queue         wq;
dengyihao's avatar
dengyihao 已提交
526

dengyihao's avatar
dengyihao 已提交
527
  // batch process to avoid to lock/unlock frequently
wafwerar's avatar
wafwerar 已提交
528
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
529
  QUEUE_MOVE(&item->qmsg, &wq);
wafwerar's avatar
wafwerar 已提交
530
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
531 532 533 534

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
535

dengyihao's avatar
dengyihao 已提交
536
    SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q);
dengyihao's avatar
dengyihao 已提交
537
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
538
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
539
      continue;
dengyihao's avatar
dengyihao 已提交
540
    }
dengyihao's avatar
dengyihao 已提交
541
    // release handle to rpc init
dengyihao's avatar
dengyihao 已提交
542 543
    if (msg->type == Quit) {
      (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
544
      continue;
dengyihao's avatar
dengyihao 已提交
545 546 547
    } else {
      STransMsg transMsg = msg->msg;

S
Shengliang Guan 已提交
548 549
      SExHandle* exh1 = transMsg.info.handle;
      int64_t    refId = transMsg.info.refId;
dengyihao's avatar
dengyihao 已提交
550
      SExHandle* exh2 = transAcquireExHandle(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
551
      if (exh2 == NULL || exh1 != exh2) {
dengyihao's avatar
dengyihao 已提交
552
        tTrace("handle except msg %p, ignore it", exh1);
dengyihao's avatar
dengyihao 已提交
553
        transReleaseExHandle(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
554 555 556 557
        destroySmsg(msg);
        continue;
      }
      msg->pConn = exh1->handle;
dengyihao's avatar
dengyihao 已提交
558
      transReleaseExHandle(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
559
      (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
560
    }
dengyihao's avatar
dengyihao 已提交
561 562
  }
}
dengyihao's avatar
dengyihao 已提交
563 564 565 566 567
static void uvWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}
dengyihao's avatar
dengyihao 已提交
568 569 570 571
static void uvFreeCb(uv_handle_t* handle) {
  //
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
572

dengyihao's avatar
dengyihao 已提交
573 574
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
dengyihao's avatar
dengyihao 已提交
575
  tDebug("close server port %d", srv->port);
dengyihao's avatar
dengyihao 已提交
576
  uv_walk(srv->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
577
}
dengyihao's avatar
dengyihao 已提交
578

dengyihao's avatar
dengyihao 已提交
579
static void uvShutDownCb(uv_shutdown_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
580 581 582
  if (status != 0) {
    tDebug("conn failed to shut down: %s", uv_err_name(status));
  }
dengyihao's avatar
dengyihao 已提交
583
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
wafwerar's avatar
wafwerar 已提交
584
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
585 586
}

dengyihao's avatar
dengyihao 已提交
587 588 589
static void uvWorkDoTask(uv_work_t* req) {
  // doing time-consumeing task
  // only auth conn currently, add more func later
dengyihao's avatar
dengyihao 已提交
590
  tTrace("conn %p start to be processed in BG Thread", req->data);
dengyihao's avatar
dengyihao 已提交
591 592 593 594 595
  return;
}

static void uvWorkAfterTask(uv_work_t* req, int status) {
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
596
    tTrace("conn %p failed to processed ", req->data);
dengyihao's avatar
dengyihao 已提交
597 598 599 600
  }
  // Done time-consumeing task
  // add more func later
  // this func called in main loop
dengyihao's avatar
dengyihao 已提交
601
  tTrace("conn %p already processed ", req->data);
dengyihao's avatar
dengyihao 已提交
602 603 604
  taosMemoryFree(req);
}

dengyihao's avatar
dengyihao 已提交
605 606 607 608 609 610
void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

wafwerar's avatar
wafwerar 已提交
611
  uv_tcp_t* cli = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
612 613 614
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
wafwerar's avatar
wafwerar 已提交
615
    if (pObj->numOfWorkerReady < pObj->numOfThreads) {
dengyihao's avatar
dengyihao 已提交
616 617
      tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads,
             pObj->numOfWorkerReady);
wafwerar's avatar
wafwerar 已提交
618 619 620
      uv_close((uv_handle_t*)cli, NULL);
      return;
    }
dengyihao's avatar
dengyihao 已提交
621

wafwerar's avatar
wafwerar 已提交
622
    uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
623
    wr->data = cli;
dengyihao's avatar
dengyihao 已提交
624 625 626
    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
627

dengyihao's avatar
dengyihao 已提交
628
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
629

dengyihao's avatar
dengyihao 已提交
630
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
631 632 633 634 635
  } else {
    uv_close((uv_handle_t*)cli, NULL);
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
636
  tTrace("connection coming");
dengyihao's avatar
dengyihao 已提交
637 638 639 640 641
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
dengyihao's avatar
dengyihao 已提交
642 643 644 645
    tError("failed to create connect: %p", q);
    taosMemoryFree(buf->base);
    uv_close((uv_handle_t*)q, NULL);
    // taosMemoryFree(q);
dengyihao's avatar
dengyihao 已提交
646 647 648 649 650
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
wafwerar's avatar
wafwerar 已提交
651
  taosMemoryFree(buf->base);
dengyihao's avatar
dengyihao 已提交
652 653 654 655 656 657 658 659 660 661 662 663

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
dengyihao 已提交
664
  SSvrConn* pConn = createConn(pThrd);
665

dengyihao's avatar
dengyihao 已提交
666
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
667
  /* init conn timer*/
dengyihao's avatar
dengyihao 已提交
668 669
  // uv_timer_init(pThrd->loop, &pConn->pTimer);
  // pConn->pTimer.data = pConn;
dengyihao's avatar
dengyihao 已提交
670 671 672 673

  pConn->hostThrd = pThrd;

  // init client handle
wafwerar's avatar
wafwerar 已提交
674
  pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
675 676 677
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

dengyihao's avatar
dengyihao 已提交
678
  pConn->pWriter.data = pConn;
dengyihao's avatar
dengyihao 已提交
679

dengyihao's avatar
dengyihao 已提交
680 681
  transSetConnOption((uv_tcp_t*)pConn->pTcp);

dengyihao's avatar
dengyihao 已提交
682 683 684
  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
685
    tTrace("conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
686

dengyihao's avatar
dengyihao 已提交
687 688
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
689
      tError("conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
690
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
691
      return;
dengyihao's avatar
dengyihao 已提交
692
    }
dengyihao's avatar
dengyihao 已提交
693

dengyihao's avatar
dengyihao 已提交
694 695
    addrlen = sizeof(pConn->localAddr);
    if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->localAddr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
696
      tError("conn %p failed to get local info", pConn);
dengyihao's avatar
dengyihao 已提交
697
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
698 699 700
      return;
    }

dengyihao's avatar
dengyihao 已提交
701
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
dengyihao's avatar
dengyihao 已提交
702

dengyihao's avatar
dengyihao 已提交
703
  } else {
dengyihao's avatar
dengyihao 已提交
704
    tDebug("failed to create new connection");
dengyihao's avatar
dengyihao 已提交
705
    transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
706 707 708
  }
}

709
void* transAcceptThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
710
  // opt
dengyihao's avatar
dengyihao 已提交
711
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
712 713
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
714 715

  return NULL;
dengyihao's avatar
dengyihao 已提交
716
}
dengyihao's avatar
dengyihao 已提交
717
void uvOnPipeConnectionCb(uv_connect_t* connect, int status) {
wafwerar's avatar
wafwerar 已提交
718 719 720 721 722 723
  if (status != 0) {
    return;
  }
  SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req);
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
}
dengyihao's avatar
dengyihao 已提交
724
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) {
wafwerar's avatar
wafwerar 已提交
725
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
726 727 728
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
729

dengyihao's avatar
dengyihao 已提交
730
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
wafwerar's avatar
wafwerar 已提交
731
  // int r = uv_pipe_open(pThrd->pipe, pThrd->fd);
dengyihao's avatar
dengyihao 已提交
732 733 734

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
735
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
736
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
737

dengyihao's avatar
fix bug  
dengyihao 已提交
738 739 740
  // conn set
  QUEUE_INIT(&pThrd->conn);

dengyihao's avatar
dengyihao 已提交
741
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 1, pThrd, uvWorkerAsyncCb);
wafwerar's avatar
wafwerar 已提交
742 743
  uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
  // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
744 745 746 747 748 749 750 751 752 753 754 755 756
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
757
  // register an async here to quit server gracefully
wafwerar's avatar
wafwerar 已提交
758
  srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t));
dengyihao's avatar
dengyihao 已提交
759 760
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
761

dengyihao's avatar
dengyihao 已提交
762
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
763 764 765 766 767
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
768
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
769
    tError("failed to listen: %s", uv_err_name(err));
dengyihao's avatar
dengyihao 已提交
770
    terrno = TSDB_CODE_RPC_PORT_EADDRINUSE;
dengyihao's avatar
dengyihao 已提交
771 772 773
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
774
}
775
void* transWorkerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
776
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
777
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
778
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
779 780

  return NULL;
dengyihao's avatar
dengyihao 已提交
781 782
}

dengyihao's avatar
dengyihao 已提交
783
static SSvrConn* createConn(void* hThrd) {
dengyihao's avatar
fix bug  
dengyihao 已提交
784 785
  SWorkThrdObj* pThrd = hThrd;

dengyihao's avatar
dengyihao 已提交
786
  SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
dengyihao's avatar
fix bug  
dengyihao 已提交
787 788 789
  QUEUE_INIT(&pConn->queue);

  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
dengyihao's avatar
dengyihao 已提交
790 791

  transQueueInit(&pConn->srvMsgs, NULL);
dengyihao's avatar
dengyihao 已提交
792

dengyihao's avatar
dengyihao 已提交
793
  memset(&pConn->regArg, 0, sizeof(pConn->regArg));
dengyihao's avatar
dengyihao 已提交
794
  pConn->broken = false;
dengyihao's avatar
dengyihao 已提交
795
  pConn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
796

dengyihao's avatar
dengyihao 已提交
797 798 799
  SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
  exh->handle = pConn;
  exh->pThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
800 801
  exh->refId = transAddExHandle(refMgt, exh);
  transAcquireExHandle(refMgt, exh->refId);
dengyihao's avatar
dengyihao 已提交
802 803

  pConn->refId = exh->refId;
dengyihao's avatar
dengyihao 已提交
804
  transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
805
  tTrace("%s handle %p, conn %p created, refId: %" PRId64 "", transLabel(pThrd->pTransInst), exh, pConn, pConn->refId);
dengyihao's avatar
dengyihao 已提交
806 807
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
808

dengyihao's avatar
dengyihao 已提交
809
static void destroyConn(SSvrConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
810 811 812
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
813

dengyihao's avatar
dengyihao 已提交
814
  transDestroyBuffer(&conn->readBuf);
815
  if (clear) {
dengyihao's avatar
dengyihao 已提交
816
    tTrace("conn %p to be destroyed", conn);
817 818 819 820
    // uv_shutdown_t* req = taosMemoryMalloc(sizeof(uv_shutdown_t));
    uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
    // uv_close(conn->pTcp)
    // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
821
  }
dengyihao's avatar
dengyihao 已提交
822
}
dengyihao's avatar
dengyihao 已提交
823
static void destroyConnRegArg(SSvrConn* conn) {
dengyihao's avatar
dengyihao 已提交
824 825 826 827 828
  if (conn->regArg.init == 1) {
    transFreeMsg(conn->regArg.msg.pCont);
    conn->regArg.init = 0;
  }
}
dengyihao's avatar
dengyihao 已提交
829
static int reallocConnRefHandle(SSvrConn* conn) {
dengyihao's avatar
dengyihao 已提交
830 831
  transReleaseExHandle(refMgt, conn->refId);
  transRemoveExHandle(refMgt, conn->refId);
dengyihao's avatar
dengyihao 已提交
832 833 834 835
  // avoid app continue to send msg on invalid handle
  SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
836 837
  exh->refId = transAddExHandle(refMgt, exh);
  transAcquireExHandle(refMgt, exh->refId);
dengyihao's avatar
dengyihao 已提交
838 839 840 841
  conn->refId = exh->refId;

  return 0;
}
dengyihao's avatar
dengyihao 已提交
842
static void uvDestroyConn(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
843
  SSvrConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
844 845 846
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
847 848
  SWorkThrdObj* thrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
849 850
  transReleaseExHandle(refMgt, conn->refId);
  transRemoveExHandle(refMgt, conn->refId);
dengyihao's avatar
dengyihao 已提交
851

dengyihao's avatar
dengyihao 已提交
852
  tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn);
dengyihao's avatar
dengyihao 已提交
853
  // uv_timer_stop(&conn->pTimer);
dengyihao's avatar
dengyihao 已提交
854
  transQueueDestroy(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
855

dengyihao's avatar
dengyihao 已提交
856
  QUEUE_REMOVE(&conn->queue);
wafwerar's avatar
wafwerar 已提交
857
  taosMemoryFree(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
858
  destroyConnRegArg(conn);
dengyihao's avatar
dengyihao 已提交
859
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
860

dengyihao's avatar
dengyihao 已提交
861
  if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
862
    tTrace("work thread quit");
dengyihao's avatar
dengyihao 已提交
863
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
864
  }
dengyihao's avatar
dengyihao 已提交
865
}
wafwerar's avatar
wafwerar 已提交
866 867 868 869
static void uvPipeListenCb(uv_stream_t* handle, int status) {
  ASSERT(status == 0);

  SServerObj* srv = container_of(handle, SServerObj, pipeListen);
dengyihao's avatar
dengyihao 已提交
870
  uv_pipe_t*  pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
wafwerar's avatar
wafwerar 已提交
871 872 873 874 875 876 877 878 879 880 881 882 883 884
  ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1));
  ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe));

  ASSERT(1 == uv_is_readable((uv_stream_t*)pipe));
  ASSERT(1 == uv_is_writable((uv_stream_t*)pipe));
  ASSERT(0 == uv_is_closing((uv_handle_t*)pipe));

  srv->numOfWorkerReady++;

  // ASSERT(0 == uv_listen((uv_stream_t*)&ctx.send.tcp, 512, uvOnAcceptCb));

  // r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, read_cb);
  // ASSERT(r == 0);
}
dengyihao's avatar
dengyihao 已提交
885

U
ubuntu 已提交
886
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
887 888
  SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
889 890
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
wafwerar's avatar
wafwerar 已提交
891
  srv->numOfWorkerReady = 0;
wafwerar's avatar
wafwerar 已提交
892 893
  srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
dengyihao's avatar
dengyihao 已提交
894 895 896 897
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

dengyihao's avatar
dengyihao 已提交
898
  // taosThreadOnce(&transModuleInit, uvInitEnv);
dengyihao's avatar
dengyihao 已提交
899 900
  int ref = atomic_add_fetch_32(&tranSSvrInst, 1);
  if (ref == 1) {
dengyihao's avatar
dengyihao 已提交
901 902
    refMgt = transOpenExHandleMgt(50000);
  }
dengyihao's avatar
dengyihao 已提交
903

wafwerar's avatar
wafwerar 已提交
904 905
  assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0));
#ifdef WINDOWS
906 907
  char pipeName[64];
  snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId());
wafwerar's avatar
wafwerar 已提交
908
#else
909
  char pipeName[PATH_MAX] = {0};
dengyihao's avatar
dengyihao 已提交
910 911
  snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(),
           taosGetSelfPthreadId());
wafwerar's avatar
wafwerar 已提交
912 913 914 915
#endif
  assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName));
  assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));

dengyihao's avatar
dengyihao 已提交
916
  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
917
    SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
918
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
919
    thrd->quit = false;
dengyihao's avatar
dengyihao 已提交
920
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
921
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
922

wafwerar's avatar
wafwerar 已提交
923
    srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
dengyihao's avatar
dengyihao 已提交
924
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
925

dengyihao's avatar
dengyihao 已提交
926
    if (false == addHandleToWorkloop(thrd, pipeName)) {
dengyihao's avatar
dengyihao 已提交
927 928
      goto End;
    }
929
    int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
dengyihao's avatar
dengyihao 已提交
930 931 932 933 934 935
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
dengyihao's avatar
dengyihao 已提交
936
      goto End;
dengyihao's avatar
dengyihao 已提交
937 938
    }
  }
939
  if (false == taosValidIpAndPort(srv->ip, srv->port)) {
dengyihao's avatar
dengyihao 已提交
940
    terrno = TAOS_SYSTEM_ERROR(errno);
dengyihao's avatar
dengyihao 已提交
941
    tError("invalid ip/port, %d:%d, reason: %s", srv->ip, srv->port, terrstr());
942 943
    goto End;
  }
dengyihao's avatar
dengyihao 已提交
944 945 946
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
947
  int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
dengyihao's avatar
dengyihao 已提交
948 949 950
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
dengyihao's avatar
dengyihao 已提交
951 952
    tError("failed  to create accept-thread");
    goto End;
dengyihao's avatar
dengyihao 已提交
953 954
    // clear all resource later
  }
dengyihao's avatar
dengyihao 已提交
955
  srv->inited = true;
dengyihao's avatar
dengyihao 已提交
956
  return srv;
dengyihao's avatar
dengyihao 已提交
957
End:
U
ubuntu 已提交
958
  transCloseServer(srv);
dengyihao's avatar
dengyihao 已提交
959 960
  return NULL;
}
dengyihao's avatar
dengyihao 已提交
961

dengyihao's avatar
dengyihao 已提交
962
void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
963
  thrd->quit = true;
dengyihao's avatar
dengyihao 已提交
964
  if (QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
965
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
966 967 968
  } else {
    destroyAllConn(thrd);
  }
wafwerar's avatar
wafwerar 已提交
969
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
970
}
dengyihao's avatar
dengyihao 已提交
971 972
void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) {
  SSvrConn* conn = msg->pConn;
dengyihao's avatar
dengyihao 已提交
973
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
974
    reallocConnRefHandle(conn);
dengyihao's avatar
dengyihao 已提交
975
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
976
      return;
dengyihao's avatar
dengyihao 已提交
977 978 979
    }
    uvStartSendRespInternal(msg);
    return;
dengyihao's avatar
dengyihao 已提交
980
  } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
dengyihao's avatar
dengyihao 已提交
981
    tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pTransInst), conn);
dengyihao's avatar
dengyihao 已提交
982
  }
dengyihao's avatar
dengyihao 已提交
983
  destroySmsg(msg);
dengyihao's avatar
dengyihao 已提交
984
}
dengyihao's avatar
dengyihao 已提交
985
void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
986
  // send msg to client
dengyihao's avatar
dengyihao 已提交
987
  tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pTransInst), msg->pConn);
dengyihao's avatar
dengyihao 已提交
988 989
  uvStartSendResp(msg);
}
dengyihao's avatar
dengyihao 已提交
990 991
void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) {
  SSvrConn* conn = msg->pConn;
dengyihao's avatar
dengyihao 已提交
992
  tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pTransInst), conn);
dengyihao's avatar
dengyihao 已提交
993
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
994
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
995 996
      return;
    }
dengyihao's avatar
dengyihao 已提交
997
    transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
998 999 1000
    conn->regArg.notifyCount = 0;
    conn->regArg.init = 1;
    conn->regArg.msg = msg->msg;
dengyihao's avatar
dengyihao 已提交
1001
    tDebug("conn %p register brokenlink callback succ", conn);
dengyihao's avatar
dengyihao 已提交
1002 1003 1004 1005 1006 1007

    if (conn->broken) {
      STrans* pTransInst = conn->pTransInst;
      (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
      memset(&conn->regArg, 0, sizeof(conn->regArg));
    }
wafwerar's avatar
wafwerar 已提交
1008
    taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
1009 1010
  }
}
dengyihao's avatar
dengyihao 已提交
1011 1012 1013 1014
void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
1015
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
1016
  SRV_RELEASE_UV(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
1017
  TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
dengyihao's avatar
dengyihao 已提交
1018
  transDestroyAsyncPool(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
1019
  taosMemoryFree(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
1020
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
1021
}
dengyihao's avatar
dengyihao 已提交
1022
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
1023
  SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
dengyihao's avatar
dengyihao 已提交
1024
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
1025
  tDebug("server send quit msg to work thread");
dengyihao's avatar
dengyihao 已提交
1026
  transSendAsync(pThrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
1027 1028
}

U
ubuntu 已提交
1029
void transCloseServer(void* arg) {
dengyihao's avatar
dengyihao 已提交
1030 1031
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
1032 1033

  tDebug("send quit msg to accept thread");
dengyihao's avatar
dengyihao 已提交
1034 1035 1036 1037
  if (srv->inited) {
    uv_async_send(srv->pAcceptAsync);
    taosThreadJoin(srv->thread, NULL);
  }
dengyihao's avatar
dengyihao 已提交
1038
  SRV_RELEASE_UV(srv->loop);
dengyihao's avatar
dengyihao 已提交
1039

dengyihao's avatar
dengyihao 已提交
1040 1041 1042 1043 1044
  for (int i = 0; i < srv->numOfThreads; i++) {
    sendQuitToWorkThrd(srv->pThreadObj[i]);
    destroyWorkThrd(srv->pThreadObj[i]);
  }

wafwerar's avatar
wafwerar 已提交
1045 1046 1047
  taosMemoryFree(srv->pThreadObj);
  taosMemoryFree(srv->pAcceptAsync);
  taosMemoryFree(srv->loop);
dengyihao's avatar
dengyihao 已提交
1048 1049

  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
1050
    taosMemoryFree(srv->pipe[i]);
dengyihao's avatar
dengyihao 已提交
1051
  }
wafwerar's avatar
wafwerar 已提交
1052
  taosMemoryFree(srv->pipe);
dengyihao's avatar
dengyihao 已提交
1053

wafwerar's avatar
wafwerar 已提交
1054
  taosMemoryFree(srv);
dengyihao's avatar
dengyihao 已提交
1055

dengyihao's avatar
dengyihao 已提交
1056 1057
  int ref = atomic_sub_fetch_32(&tranSSvrInst, 1);
  if (ref == 0) {
dengyihao's avatar
dengyihao 已提交
1058 1059 1060
    // TdThreadOnce tmpInit = PTHREAD_ONCE_INIT;
    // memcpy(&transModuleInit, &tmpInit, sizeof(TdThreadOnce));
    transCloseExHandleMgt(refMgt);
dengyihao's avatar
dengyihao 已提交
1061
  }
dengyihao's avatar
dengyihao 已提交
1062
}
dengyihao's avatar
dengyihao 已提交
1063

dengyihao's avatar
dengyihao 已提交
1064 1065 1066 1067
void transRefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1068
  int ref = T_REF_INC((SSvrConn*)handle);
dengyihao's avatar
dengyihao 已提交
1069
  tDebug("conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
1070 1071 1072 1073 1074 1075
}

void transUnrefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1076
  int ref = T_REF_DEC((SSvrConn*)handle);
dengyihao's avatar
dengyihao 已提交
1077
  tDebug("conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
1078
  if (ref == 0) {
dengyihao's avatar
dengyihao 已提交
1079
    destroyConn((SSvrConn*)handle, true);
dengyihao's avatar
dengyihao 已提交
1080 1081
  }
}
dengyihao's avatar
dengyihao 已提交
1082 1083

void transReleaseSrvHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
1084
  SExHandle* exh = handle;
dengyihao's avatar
dengyihao 已提交
1085 1086
  int64_t    refId = exh->refId;

dengyihao's avatar
dengyihao 已提交
1087
  ASYNC_CHECK_HANDLE(exh, refId);
dengyihao's avatar
dengyihao 已提交
1088

dengyihao's avatar
dengyihao 已提交
1089 1090 1091
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);

S
Shengliang Guan 已提交
1092
  STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId};
dengyihao's avatar
dengyihao 已提交
1093

dengyihao's avatar
dengyihao 已提交
1094 1095 1096
  SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
  m->msg = tmsg;
  m->type = Release;
dengyihao's avatar
dengyihao 已提交
1097

dengyihao's avatar
dengyihao 已提交
1098
  tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
dengyihao's avatar
dengyihao 已提交
1099
  transSendAsync(pThrd->asyncPool, &m->q);
dengyihao's avatar
dengyihao 已提交
1100
  transReleaseExHandle(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
1101
  return;
dengyihao's avatar
dengyihao 已提交
1102
_return1:
dengyihao's avatar
dengyihao 已提交
1103
  tTrace("handle %p failed to send to release handle", exh);
dengyihao's avatar
dengyihao 已提交
1104
  transReleaseExHandle(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
1105 1106
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1107
  tTrace("handle %p failed to send to release handle", exh);
dengyihao's avatar
dengyihao 已提交
1108
  return;
dengyihao's avatar
dengyihao 已提交
1109
}
dengyihao's avatar
dengyihao 已提交
1110
void transSendResponse(const STransMsg* msg) {
S
Shengliang Guan 已提交
1111 1112
  SExHandle* exh = msg->info.handle;
  int64_t    refId = msg->info.refId;
dengyihao's avatar
dengyihao 已提交
1113
  ASYNC_CHECK_HANDLE(exh, refId);
dengyihao's avatar
dengyihao 已提交
1114
  assert(refId != 0);
dengyihao's avatar
dengyihao 已提交
1115

dengyihao's avatar
dengyihao 已提交
1116
  STransMsg tmsg = *msg;
S
Shengliang Guan 已提交
1117
  tmsg.info.refId = refId;
dengyihao's avatar
dengyihao 已提交
1118

dengyihao's avatar
dengyihao 已提交
1119 1120
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);
dengyihao's avatar
dengyihao 已提交
1121

dengyihao's avatar
dengyihao 已提交
1122 1123 1124
  SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
  m->msg = tmsg;
  m->type = Normal;
dengyihao's avatar
dengyihao 已提交
1125 1126 1127

  STraceId* trace = (STraceId*)&msg->info.traceId;
  tGTrace("conn %p start to send resp (1/2)", exh->handle);
dengyihao's avatar
dengyihao 已提交
1128
  transSendAsync(pThrd->asyncPool, &m->q);
dengyihao's avatar
dengyihao 已提交
1129
  transReleaseExHandle(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
1130
  return;
dengyihao's avatar
dengyihao 已提交
1131
_return1:
dengyihao's avatar
dengyihao 已提交
1132
  tTrace("handle %p failed to send resp", exh);
dengyihao's avatar
dengyihao 已提交
1133
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1134
  transReleaseExHandle(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
1135 1136
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1137
  tTrace("handle %p failed to send resp", exh);
dengyihao's avatar
dengyihao 已提交
1138 1139
  rpcFreeCont(msg->pCont);
  return;
dengyihao's avatar
dengyihao 已提交
1140
}
dengyihao's avatar
dengyihao 已提交
1141
void transRegisterMsg(const STransMsg* msg) {
S
Shengliang Guan 已提交
1142 1143
  SExHandle* exh = msg->info.handle;
  int64_t    refId = msg->info.refId;
dengyihao's avatar
dengyihao 已提交
1144 1145
  ASYNC_CHECK_HANDLE(exh, refId);

dengyihao's avatar
dengyihao 已提交
1146
  STransMsg tmsg = *msg;
S
Shengliang Guan 已提交
1147
  tmsg.info.refId = refId;
dengyihao's avatar
dengyihao 已提交
1148

dengyihao's avatar
dengyihao 已提交
1149 1150
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);
dengyihao's avatar
dengyihao 已提交
1151

dengyihao's avatar
dengyihao 已提交
1152 1153 1154
  SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
  m->msg = tmsg;
  m->type = Register;
dengyihao's avatar
dengyihao 已提交
1155

dengyihao's avatar
dengyihao 已提交
1156
  tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle);
dengyihao's avatar
dengyihao 已提交
1157
  transSendAsync(pThrd->asyncPool, &m->q);
dengyihao's avatar
dengyihao 已提交
1158
  transReleaseExHandle(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
1159
  return;
dengyihao's avatar
dengyihao 已提交
1160

dengyihao's avatar
dengyihao 已提交
1161
_return1:
dengyihao's avatar
dengyihao 已提交
1162
  tTrace("handle %p failed to register brokenlink", exh);
dengyihao's avatar
dengyihao 已提交
1163
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1164
  transReleaseExHandle(refMgt, refId);
dengyihao's avatar
dengyihao 已提交
1165 1166
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1167
  tTrace("handle %p failed to register brokenlink", exh);
dengyihao's avatar
dengyihao 已提交
1168
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1169
}
dengyihao's avatar
dengyihao 已提交
1170

dengyihao's avatar
dengyihao 已提交
1171
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
dengyihao's avatar
dengyihao 已提交
1172

dengyihao's avatar
dengyihao 已提交
1173
#endif