transSrv.c 37.6 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20 21 22
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;

static char* notify = "a";
dengyihao's avatar
dengyihao 已提交
23
static int   transSrvInst = 0;
dengyihao's avatar
dengyihao 已提交
24

dengyihao's avatar
dengyihao 已提交
25 26 27 28 29 30
typedef struct {
  int       notifyCount;  //
  int       init;         // init or not
  STransMsg msg;
} SSrvRegArg;

dengyihao's avatar
dengyihao 已提交
31
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
32
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
33 34 35
  uv_tcp_t*  pTcp;
  uv_write_t pWriter;
  uv_timer_t pTimer;
dengyihao's avatar
dengyihao 已提交
36 37

  queue       queue;
dengyihao's avatar
dengyihao 已提交
38
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
39 40 41
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
42
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
43
  STransQueue srvMsgs;
dengyihao's avatar
dengyihao 已提交
44

dengyihao's avatar
dengyihao 已提交
45 46
  SSrvRegArg regArg;
  bool       broken;  // conn broken;
dengyihao's avatar
dengyihao 已提交
47

dengyihao's avatar
dengyihao 已提交
48
  ConnStatus         status;
dengyihao's avatar
dengyihao 已提交
49
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
50
  struct sockaddr_in locaddr;
U
ubuntu 已提交
51

dengyihao's avatar
dengyihao 已提交
52 53 54 55 56 57
  int64_t refId;
  int     spi;
  char    info[64];
  char    user[TSDB_UNI_LEN];  // user ID for the link
  char    secret[TSDB_PASSWORD_LEN];
  char    ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
58 59 60
} SSrvConn;

typedef struct SSrvMsg {
dengyihao's avatar
dengyihao 已提交
61 62 63 64
  SSrvConn*     pConn;
  STransMsg     msg;
  queue         q;
  STransMsgType type;
dengyihao's avatar
dengyihao 已提交
65
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
66 67

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
68
  TdThread      thread;
wafwerar's avatar
wafwerar 已提交
69
  uv_connect_t  connect_req;
dengyihao's avatar
dengyihao 已提交
70 71 72 73 74
  uv_pipe_t*    pipe;
  uv_os_fd_t    fd;
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  queue         msg;
wafwerar's avatar
wafwerar 已提交
75
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
76 77 78 79

  queue conn;
  void* pTransInst;
  bool  quit;
dengyihao's avatar
dengyihao 已提交
80 81 82
} SWorkThrdObj;

typedef struct SServerObj {
dengyihao's avatar
dengyihao 已提交
83
  TdThread   thread;
dengyihao's avatar
dengyihao 已提交
84 85 86 87
  uv_tcp_t   server;
  uv_loop_t* loop;

  // work thread info
dengyihao's avatar
dengyihao 已提交
88 89
  int            workerIdx;
  int            numOfThreads;
wafwerar's avatar
wafwerar 已提交
90
  int            numOfWorkerReady;
dengyihao's avatar
dengyihao 已提交
91
  SWorkThrdObj** pThreadObj;
dengyihao's avatar
dengyihao 已提交
92

wafwerar's avatar
wafwerar 已提交
93
  uv_pipe_t   pipeListen;
dengyihao's avatar
dengyihao 已提交
94 95 96 97
  uv_pipe_t** pipe;
  uint32_t    ip;
  uint32_t    port;
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
98 99

  bool inited;
dengyihao's avatar
dengyihao 已提交
100 101
} SServerObj;

dengyihao's avatar
dengyihao 已提交
102 103 104 105 106 107 108
// handle
typedef struct SExHandle {
  void*         handle;
  int64_t       refId;
  SWorkThrdObj* pThrd;
} SExHandle;

dengyihao's avatar
dengyihao 已提交
109
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
110
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
111
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
112
static void uvOnTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
113
static void uvOnSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
114
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
115 116 117
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
118
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
119
static void uvShutDownCb(uv_shutdown_t* req, int status);
dengyihao's avatar
dengyihao 已提交
120 121 122 123 124 125 126

/*
 * time-consuming task throwed into BG work thread
 */
static void uvWorkDoTask(uv_work_t* req);
static void uvWorkAfterTask(uv_work_t* req, int status);

dengyihao's avatar
dengyihao 已提交
127 128
static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
129

dengyihao's avatar
dengyihao 已提交
130
static void uvStartSendRespInternal(SSrvMsg* smsg);
dengyihao's avatar
dengyihao 已提交
131 132
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
133

dengyihao's avatar
dengyihao 已提交
134 135
static void uvNotifyLinkBrokenToApp(SSrvConn* conn);

dengyihao's avatar
dengyihao 已提交
136
static void destroySmsg(SSrvMsg* smsg);
137
// check whether already read complete packet
dengyihao's avatar
fix bug  
dengyihao 已提交
138
static SSrvConn* createConn(void* hThrd);
dengyihao's avatar
dengyihao 已提交
139
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
140
static int       reallocConnRefHandle(SSrvConn* conn);
dengyihao's avatar
dengyihao 已提交
141

dengyihao's avatar
dengyihao 已提交
142 143
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
144
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
145
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
146 147
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
                                                                       uvHandleRegister};
dengyihao's avatar
dengyihao 已提交
148

dengyihao's avatar
dengyihao 已提交
149
static int32_t exHandlesMgt;
dengyihao's avatar
dengyihao 已提交
150

dengyihao's avatar
dengyihao 已提交
151
void       uvInitEnv();
dengyihao's avatar
dengyihao 已提交
152
void       uvOpenExHandleMgt(int size);
dengyihao's avatar
dengyihao 已提交
153
void       uvCloseExHandleMgt();
dengyihao's avatar
dengyihao 已提交
154 155 156 157 158 159
int64_t    uvAddExHandle(void* p);
int32_t    uvRemoveExHandle(int64_t refId);
int32_t    uvReleaseExHandle(int64_t refId);
void       uvDestoryExHandle(void* handle);
SExHandle* uvAcquireExHandle(int64_t refId);

160
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
161

dengyihao's avatar
dengyihao 已提交
162
// server and worker thread
163 164
static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
dengyihao's avatar
dengyihao 已提交
165

dengyihao's avatar
dengyihao 已提交
166
// add handle loop
dengyihao's avatar
dengyihao 已提交
167
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName);
dengyihao's avatar
dengyihao 已提交
168 169
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
#define CONN_SHOULD_RELEASE(conn, head)                                     \
  do {                                                                      \
    if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) {          \
      conn->status = ConnRelease;                                           \
      transClearBuffer(&conn->readBuf);                                     \
      transFreeMsg(transContFromHead((char*)head));                         \
      tTrace("server conn %p received release request", conn);              \
                                                                            \
      STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
      SSrvMsg*  srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));              \
      srvMsg->msg = tmsg;                                                   \
      srvMsg->type = Release;                                               \
      srvMsg->pConn = conn;                                                 \
      if (!transQueuePush(&conn->srvMsgs, srvMsg)) {                        \
        return;                                                             \
      }                                                                     \
      uvStartSendRespInternal(srvMsg);                                      \
      return;                                                               \
    }                                                                       \
  } while (0)

#define SRV_RELEASE_UV(loop)       \
  do {                             \
    uv_walk(loop, uvWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);  \
    uv_loop_close(loop);           \
  } while (0);

dengyihao's avatar
dengyihao 已提交
198 199 200 201
#define ASYNC_ERR_JRET(thrd)                            \
  do {                                                  \
    if (thrd->quit) {                                   \
      tTrace("worker thread already quit, ignore msg"); \
dengyihao's avatar
dengyihao 已提交
202
      goto _return1;                                    \
dengyihao's avatar
dengyihao 已提交
203 204 205
    }                                                   \
  } while (0)

dengyihao's avatar
dengyihao 已提交
206 207 208 209 210
#define ASYNC_CHECK_HANDLE(exh1, refId)                                                                               \
  do {                                                                                                                \
    if (refId > 0) {                                                                                                  \
      tTrace("server handle step1");                                                                                  \
      SExHandle* exh2 = uvAcquireExHandle(refId);                                                                     \
dengyihao's avatar
dengyihao 已提交
211
      if (exh2 == NULL || refId != exh2->refId) {                                                                     \
dengyihao's avatar
dengyihao 已提交
212
        tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
dengyihao's avatar
dengyihao 已提交
213
               exh2 ? exh2->refId : 0, refId);                                                                        \
dengyihao's avatar
dengyihao 已提交
214 215 216
        goto _return1;                                                                                                \
      }                                                                                                               \
    } else if (refId == 0) {                                                                                          \
dengyihao's avatar
dengyihao 已提交
217
      tTrace("server handle step2");                                                                                  \
dengyihao's avatar
dengyihao 已提交
218
      SExHandle* exh2 = uvAcquireExHandle(refId);                                                                     \
dengyihao's avatar
dengyihao 已提交
219 220 221
      if (exh2 == NULL || refId != exh2->refId) {                                                                     \
        tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
               refId, exh2 ? exh2->refId : 0);                                                                        \
dengyihao's avatar
dengyihao 已提交
222 223 224 225 226
        goto _return1;                                                                                                \
      } else {                                                                                                        \
        refId = exh1->refId;                                                                                          \
      }                                                                                                               \
    } else if (refId == -1) {                                                                                         \
dengyihao's avatar
dengyihao 已提交
227
      tTrace("server handle step3");                                                                                  \
dengyihao's avatar
dengyihao 已提交
228 229
      goto _return2;                                                                                                  \
    }                                                                                                                 \
dengyihao's avatar
dengyihao 已提交
230 231
  } while (0)

dengyihao's avatar
dengyihao 已提交
232
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
233 234 235
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
236 237 238 239
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
240
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
241
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
242 243
}

dengyihao's avatar
dengyihao 已提交
244 245
static void uvHandleReq(SSrvConn* pConn) {
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
246 247 248 249
  char*        msg = pBuf->buf;
  uint32_t     msgLen = pBuf->len;

  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
250
  pHead->code = htonl(pHead->code);
dengyihao's avatar
dengyihao 已提交
251
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
252
  memcpy(pConn->user, pHead->user, strlen(pHead->user));
dengyihao's avatar
dengyihao 已提交
253

dengyihao's avatar
dengyihao 已提交
254 255 256 257 258 259 260
  // TODO(dengyihao): time-consuming task throwed into BG Thread
  //  uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
  //  wreq->data = pConn;
  //  uv_read_stop((uv_stream_t*)pConn->pTcp);
  //  transRefSrvHandle(pConn);
  //  uv_queue_work(((SWorkThrdObj*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);

dengyihao's avatar
dengyihao 已提交
261
  CONN_SHOULD_RELEASE(pConn, pHead);
dengyihao's avatar
dengyihao 已提交
262

dengyihao's avatar
dengyihao 已提交
263 264 265 266 267
  STransMsg transMsg;
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = pHead->content;
  transMsg.msgType = pHead->msgType;
  transMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
268
  transMsg.ahandle = (void*)pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
269
  transMsg.handle = NULL;
dengyihao's avatar
dengyihao 已提交
270

dengyihao's avatar
dengyihao 已提交
271
  // transDestroyBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
272
  transClearBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
273
  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
274 275 276 277
  if (pConn->status == ConnNormal) {
    if (pHead->persist == 1) {
      pConn->status = ConnAcquire;
      transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
278
      tDebug("server conn %p acquired by server app", pConn);
dengyihao's avatar
dengyihao 已提交
279 280 281
    }
  }
  if (pConn->status == ConnNormal && pHead->noResp == 0) {
dengyihao's avatar
dengyihao 已提交
282
    transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
283 284 285
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
           taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
           ntohs(pConn->locaddr.sin_port), transMsg.contLen);
dengyihao's avatar
dengyihao 已提交
286
  } else {
dengyihao's avatar
dengyihao 已提交
287 288 289
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
           TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
           taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
dengyihao's avatar
dengyihao 已提交
290 291 292
    // no ref here
  }

dengyihao's avatar
dengyihao 已提交
293 294 295 296
  // if pHead->noResp = 1,
  // 1. server application should not send resp on handle
  // 2. once send out data, cli conn released to conn pool immediately
  // 3. not mixed with persist
dengyihao's avatar
dengyihao 已提交
297

dengyihao's avatar
dengyihao 已提交
298
  transMsg.handle = (void*)uvAcquireExHandle(pConn->refId);
dengyihao's avatar
dengyihao 已提交
299 300 301
  tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.handle, pConn, pConn->refId);
  transMsg.refId = pConn->refId;
  assert(transMsg.handle != NULL);
dengyihao's avatar
dengyihao 已提交
302
  if (pHead->noResp == 1) {
dengyihao's avatar
dengyihao 已提交
303
    transMsg.refId = -1;
dengyihao's avatar
dengyihao 已提交
304 305
  }
  uvReleaseExHandle(pConn->refId);
dengyihao's avatar
dengyihao 已提交
306

dengyihao's avatar
dengyihao 已提交
307
  STrans* pTransInst = pConn->pTransInst;
dengyihao's avatar
dengyihao 已提交
308
  (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
dengyihao's avatar
dengyihao 已提交
309
  // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
310 311
}

dengyihao's avatar
dengyihao 已提交
312
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
313
  // opt
dengyihao's avatar
dengyihao 已提交
314 315
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
316 317
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
318
    tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
319
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
320
      tTrace("server conn %p alread read complete packet", conn);
dengyihao's avatar
dengyihao 已提交
321
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
322
    } else {
dengyihao's avatar
dengyihao 已提交
323
      tTrace("server %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
324 325 326
    }
    return;
  }
327 328 329
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
330 331

  tError("server conn %p read error: %s", conn, uv_err_name(nread));
U
ubuntu 已提交
332
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
333
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
334 335
    if (conn->status == ConnAcquire) {
      if (conn->regArg.init) {
dengyihao's avatar
dengyihao 已提交
336
        tTrace("server conn %p broken, notify server app", conn);
dengyihao's avatar
dengyihao 已提交
337 338 339 340 341
        STrans* pTransInst = conn->pTransInst;
        (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
        memset(&conn->regArg, 0, sizeof(conn->regArg));
      }
    }
dengyihao's avatar
dengyihao 已提交
342
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
343 344 345 346
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->len = 2;
wafwerar's avatar
wafwerar 已提交
347
  buf->base = taosMemoryCalloc(1, sizeof(char) * buf->len);
dengyihao's avatar
dengyihao 已提交
348 349 350 351
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
352
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
353
  tError("server conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
354 355
}

dengyihao's avatar
dengyihao 已提交
356
void uvOnSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
357
  SSrvConn* conn = req->data;
dengyihao's avatar
dengyihao 已提交
358
  // transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
359
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
360
    tTrace("server conn %p data already was written on stream", conn);
dengyihao's avatar
dengyihao 已提交
361 362
    if (!transQueueEmpty(&conn->srvMsgs)) {
      SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
363 364 365 366
      if (msg->type == Release && conn->status != ConnNormal) {
        conn->status = ConnNormal;
        transUnrefSrvHandle(conn);
      }
dengyihao's avatar
add UT  
dengyihao 已提交
367 368
      destroySmsg(msg);
      // send second data, just use for push
dengyihao's avatar
dengyihao 已提交
369
      if (!transQueueEmpty(&conn->srvMsgs)) {
dengyihao's avatar
dengyihao 已提交
370
        msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
371 372 373 374 375 376 377 378 379
        if (msg->type == Register && conn->status == ConnAcquire) {
          conn->regArg.notifyCount = 0;
          conn->regArg.init = 1;
          conn->regArg.msg = msg->msg;
          if (conn->broken) {
            STrans* pTransInst = conn->pTransInst;
            (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
            memset(&conn->regArg, 0, sizeof(conn->regArg));
          }
dengyihao's avatar
dengyihao 已提交
380
          transQueuePop(&conn->srvMsgs);
wafwerar's avatar
wafwerar 已提交
381
          taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
382 383

          msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
384 385 386
          if (msg != NULL) {
            uvStartSendRespInternal(msg);
          }
dengyihao's avatar
dengyihao 已提交
387 388 389
        } else {
          uvStartSendRespInternal(msg);
        }
dengyihao's avatar
add UT  
dengyihao 已提交
390
      }
dengyihao's avatar
dengyihao 已提交
391
    }
dengyihao's avatar
dengyihao 已提交
392
  } else {
dengyihao's avatar
dengyihao 已提交
393
    tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
394
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
395
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
396 397
  }
}
dengyihao's avatar
dengyihao 已提交
398 399
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
400
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
401 402 403
  } else {
    tError("fail to dispatch conn to work thread");
  }
dengyihao's avatar
dengyihao 已提交
404 405
  uv_close((uv_handle_t*)req->data, uvFreeCb);
  // taosMemoryFree(req->data);
wafwerar's avatar
wafwerar 已提交
406
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
407
}
dengyihao's avatar
dengyihao 已提交
408

dengyihao's avatar
dengyihao 已提交
409
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
dengyihao's avatar
dengyihao 已提交
410
  tTrace("server conn %p prepare to send resp", smsg->pConn);
dengyihao's avatar
dengyihao 已提交
411

dengyihao's avatar
formate  
dengyihao 已提交
412 413
  SSrvConn*  pConn = smsg->pConn;
  STransMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
414 415 416 417 418
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
419
  pHead->ahandle = (uint64_t)pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
420

dengyihao's avatar
dengyihao 已提交
421 422 423 424 425
  if (pConn->status == ConnNormal) {
    pHead->msgType = pConn->inType + 1;
  } else {
    pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType;
  }
dengyihao's avatar
dengyihao 已提交
426
  pHead->release = smsg->type == Release ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
427
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
428

dengyihao's avatar
dengyihao 已提交
429 430
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
431 432 433
  tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
         taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
         ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
434
  pHead->msgLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
435

dengyihao's avatar
dengyihao 已提交
436 437 438
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
439 440

static void uvStartSendRespInternal(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
441 442 443 444
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
445
  // uv_timer_stop(&pConn->pTimer);
dengyihao's avatar
dengyihao 已提交
446
  uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
dengyihao's avatar
dengyihao 已提交
447 448 449 450
}
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
451 452

  if (pConn->broken == true) {
dengyihao's avatar
dengyihao 已提交
453
    // persist by
dengyihao's avatar
dengyihao 已提交
454 455 456
    transUnrefSrvHandle(pConn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
457 458 459
  if (pConn->status == ConnNormal) {
    transUnrefSrvHandle(pConn);
  }
dengyihao's avatar
dengyihao 已提交
460

dengyihao's avatar
dengyihao 已提交
461
  if (!transQueuePush(&pConn->srvMsgs, smsg)) {
dengyihao's avatar
dengyihao 已提交
462 463 464
    return;
  }
  uvStartSendRespInternal(smsg);
dengyihao's avatar
dengyihao 已提交
465 466
  return;
}
dengyihao's avatar
dengyihao 已提交
467

dengyihao's avatar
dengyihao 已提交
468
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
469 470 471 472
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
wafwerar's avatar
wafwerar 已提交
473
  taosMemoryFree(smsg);
dengyihao's avatar
dengyihao 已提交
474
}
dengyihao's avatar
fix bug  
dengyihao 已提交
475
static void destroyAllConn(SWorkThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
476
  tTrace("thread %p destroy all conn ", pThrd);
dengyihao's avatar
fix bug  
dengyihao 已提交
477 478 479 480 481 482
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
    queue* h = QUEUE_HEAD(&pThrd->conn);
    QUEUE_REMOVE(h);
    QUEUE_INIT(h);

    SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
dengyihao's avatar
dengyihao 已提交
483 484 485
    while (T_REF_VAL_GET(c) >= 2) {
      transUnrefSrvHandle(c);
    }
dengyihao's avatar
dengyihao 已提交
486
    transUnrefSrvHandle(c);
dengyihao's avatar
fix bug  
dengyihao 已提交
487 488
  }
}
dengyihao's avatar
dengyihao 已提交
489
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
490 491
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
492
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
493
  queue         wq;
dengyihao's avatar
dengyihao 已提交
494

dengyihao's avatar
dengyihao 已提交
495
  // batch process to avoid to lock/unlock frequently
wafwerar's avatar
wafwerar 已提交
496
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
497
  QUEUE_MOVE(&item->qmsg, &wq);
wafwerar's avatar
wafwerar 已提交
498
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
499 500 501 502

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
503 504 505

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
506
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
507
      continue;
dengyihao's avatar
dengyihao 已提交
508
    }
dengyihao's avatar
dengyihao 已提交
509
    // release handle to rpc init
dengyihao's avatar
dengyihao 已提交
510 511
    if (msg->type == Quit) {
      (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
512
      continue;
dengyihao's avatar
dengyihao 已提交
513 514 515 516 517 518 519
    } else {
      STransMsg transMsg = msg->msg;

      SExHandle* exh1 = transMsg.handle;
      int64_t    refId = transMsg.refId;
      SExHandle* exh2 = uvAcquireExHandle(refId);
      if (exh2 == NULL || exh1 != exh2) {
dengyihao's avatar
dengyihao 已提交
520
        tTrace("server handle except msg %p, ignore it", exh1);
dengyihao's avatar
dengyihao 已提交
521 522 523 524 525
        uvReleaseExHandle(refId);
        destroySmsg(msg);
        continue;
      }
      msg->pConn = exh1->handle;
dengyihao's avatar
dengyihao 已提交
526
      uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
527
      (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
528
    }
dengyihao's avatar
dengyihao 已提交
529 530
  }
}
dengyihao's avatar
dengyihao 已提交
531 532 533 534 535
static void uvWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}
dengyihao's avatar
dengyihao 已提交
536 537 538 539
static void uvFreeCb(uv_handle_t* handle) {
  //
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
540

dengyihao's avatar
dengyihao 已提交
541 542
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
dengyihao's avatar
dengyihao 已提交
543
  tDebug("close server port %d", srv->port);
dengyihao's avatar
dengyihao 已提交
544
  uv_walk(srv->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
545
}
dengyihao's avatar
dengyihao 已提交
546

dengyihao's avatar
dengyihao 已提交
547
static void uvShutDownCb(uv_shutdown_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
548 549 550
  if (status != 0) {
    tDebug("conn failed to shut down: %s", uv_err_name(status));
  }
dengyihao's avatar
dengyihao 已提交
551
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
wafwerar's avatar
wafwerar 已提交
552
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
553 554
}

dengyihao's avatar
dengyihao 已提交
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
static void uvWorkDoTask(uv_work_t* req) {
  // doing time-consumeing task
  // only auth conn currently, add more func later
  tTrace("server conn %p start to be processed in BG Thread", req->data);
  return;
}

static void uvWorkAfterTask(uv_work_t* req, int status) {
  if (status != 0) {
    tTrace("server conn %p failed to processed ", req->data);
  }
  // Done time-consumeing task
  // add more func later
  // this func called in main loop
  tTrace("server conn %p already processed ", req->data);
  taosMemoryFree(req);
}

dengyihao's avatar
dengyihao 已提交
573 574 575 576 577 578
void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

wafwerar's avatar
wafwerar 已提交
579
  uv_tcp_t* cli = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
580 581 582
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
wafwerar's avatar
wafwerar 已提交
583
    if (pObj->numOfWorkerReady < pObj->numOfThreads) {
dengyihao's avatar
dengyihao 已提交
584 585
      tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads,
             pObj->numOfWorkerReady);
wafwerar's avatar
wafwerar 已提交
586 587 588
      uv_close((uv_handle_t*)cli, NULL);
      return;
    }
dengyihao's avatar
dengyihao 已提交
589

wafwerar's avatar
wafwerar 已提交
590
    uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
591
    wr->data = cli;
dengyihao's avatar
dengyihao 已提交
592 593 594
    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
595

dengyihao's avatar
dengyihao 已提交
596
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
597

dengyihao's avatar
dengyihao 已提交
598
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
599 600 601 602 603
  } else {
    uv_close((uv_handle_t*)cli, NULL);
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
604
  tTrace("server connection coming");
dengyihao's avatar
dengyihao 已提交
605 606 607 608 609
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
dengyihao's avatar
dengyihao 已提交
610 611 612 613
    tError("failed to create connect: %p", q);
    taosMemoryFree(buf->base);
    uv_close((uv_handle_t*)q, NULL);
    // taosMemoryFree(q);
dengyihao's avatar
dengyihao 已提交
614 615 616 617 618
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
wafwerar's avatar
wafwerar 已提交
619
  taosMemoryFree(buf->base);
dengyihao's avatar
dengyihao 已提交
620 621 622 623 624 625 626 627 628 629 630 631

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
fix bug  
dengyihao 已提交
632
  SSrvConn* pConn = createConn(pThrd);
633

dengyihao's avatar
dengyihao 已提交
634
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
635
  /* init conn timer*/
dengyihao's avatar
dengyihao 已提交
636 637
  // uv_timer_init(pThrd->loop, &pConn->pTimer);
  // pConn->pTimer.data = pConn;
dengyihao's avatar
dengyihao 已提交
638 639 640 641

  pConn->hostThrd = pThrd;

  // init client handle
wafwerar's avatar
wafwerar 已提交
642
  pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
643 644 645
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

dengyihao's avatar
dengyihao 已提交
646
  pConn->pWriter.data = pConn;
dengyihao's avatar
dengyihao 已提交
647

dengyihao's avatar
dengyihao 已提交
648 649
  transSetConnOption((uv_tcp_t*)pConn->pTcp);

dengyihao's avatar
dengyihao 已提交
650 651 652
  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
653
    tTrace("server conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
654

dengyihao's avatar
dengyihao 已提交
655 656
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
657
      tError("server conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
658
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
659
      return;
dengyihao's avatar
dengyihao 已提交
660
    }
dengyihao's avatar
dengyihao 已提交
661 662 663 664

    addrlen = sizeof(pConn->locaddr);
    if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
      tError("server conn %p failed to get local info", pConn);
dengyihao's avatar
dengyihao 已提交
665
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
666 667 668
      return;
    }

dengyihao's avatar
dengyihao 已提交
669
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
dengyihao's avatar
dengyihao 已提交
670

dengyihao's avatar
dengyihao 已提交
671
  } else {
dengyihao's avatar
dengyihao 已提交
672
    tDebug("failed to create new connection");
dengyihao's avatar
dengyihao 已提交
673
    transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
674 675 676
  }
}

677
void* transAcceptThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
678
  // opt
dengyihao's avatar
dengyihao 已提交
679
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
680 681
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
682 683

  return NULL;
dengyihao's avatar
dengyihao 已提交
684
}
dengyihao's avatar
dengyihao 已提交
685
void uvOnPipeConnectionCb(uv_connect_t* connect, int status) {
wafwerar's avatar
wafwerar 已提交
686 687 688 689 690 691
  if (status != 0) {
    return;
  }
  SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req);
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
}
dengyihao's avatar
dengyihao 已提交
692
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) {
wafwerar's avatar
wafwerar 已提交
693
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
694 695 696
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
697

dengyihao's avatar
dengyihao 已提交
698
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
wafwerar's avatar
wafwerar 已提交
699
  // int r = uv_pipe_open(pThrd->pipe, pThrd->fd);
dengyihao's avatar
dengyihao 已提交
700 701 702

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
703
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
704
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
705

dengyihao's avatar
fix bug  
dengyihao 已提交
706 707 708
  // conn set
  QUEUE_INIT(&pThrd->conn);

dengyihao's avatar
dengyihao 已提交
709
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
wafwerar's avatar
wafwerar 已提交
710 711
  uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
  // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
712 713 714 715 716 717 718 719 720 721 722 723 724
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
725
  // register an async here to quit server gracefully
wafwerar's avatar
wafwerar 已提交
726
  srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t));
dengyihao's avatar
dengyihao 已提交
727 728
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
729

dengyihao's avatar
dengyihao 已提交
730
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
731 732 733 734 735
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
736
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
737
    tError("failed to listen: %s", uv_err_name(err));
dengyihao's avatar
dengyihao 已提交
738
    terrno = TSDB_CODE_RPC_PORT_EADDRINUSE;
dengyihao's avatar
dengyihao 已提交
739 740 741
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
742
}
743
void* transWorkerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
744
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
745
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
746
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
747 748

  return NULL;
dengyihao's avatar
dengyihao 已提交
749 750
}

dengyihao's avatar
fix bug  
dengyihao 已提交
751 752 753
static SSrvConn* createConn(void* hThrd) {
  SWorkThrdObj* pThrd = hThrd;

wafwerar's avatar
wafwerar 已提交
754
  SSrvConn* pConn = (SSrvConn*)taosMemoryCalloc(1, sizeof(SSrvConn));
dengyihao's avatar
fix bug  
dengyihao 已提交
755 756 757
  QUEUE_INIT(&pConn->queue);

  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
dengyihao's avatar
dengyihao 已提交
758 759

  transQueueInit(&pConn->srvMsgs, NULL);
dengyihao's avatar
dengyihao 已提交
760

dengyihao's avatar
dengyihao 已提交
761
  memset(&pConn->regArg, 0, sizeof(pConn->regArg));
dengyihao's avatar
dengyihao 已提交
762
  pConn->broken = false;
dengyihao's avatar
dengyihao 已提交
763
  pConn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
764

dengyihao's avatar
dengyihao 已提交
765 766 767 768 769 770 771
  SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
  exh->handle = pConn;
  exh->pThrd = pThrd;
  exh->refId = uvAddExHandle(exh);
  uvAcquireExHandle(exh->refId);

  pConn->refId = exh->refId;
dengyihao's avatar
dengyihao 已提交
772
  transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
773
  tTrace("server handle %p, conn %p created, refId: %" PRId64 "", exh, pConn, pConn->refId);
dengyihao's avatar
dengyihao 已提交
774 775
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
776

dengyihao's avatar
dengyihao 已提交
777
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
778 779 780
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
781

dengyihao's avatar
dengyihao 已提交
782
  transDestroyBuffer(&conn->readBuf);
783
  if (clear) {
dengyihao's avatar
dengyihao 已提交
784
    tTrace("server conn %p to be destroyed", conn);
785 786 787 788
    // uv_shutdown_t* req = taosMemoryMalloc(sizeof(uv_shutdown_t));
    uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
    // uv_close(conn->pTcp)
    // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
789
  }
dengyihao's avatar
dengyihao 已提交
790
}
dengyihao's avatar
dengyihao 已提交
791 792 793 794 795 796 797 798 799 800 801 802 803
static int reallocConnRefHandle(SSrvConn* conn) {
  uvReleaseExHandle(conn->refId);
  uvRemoveExHandle(conn->refId);
  // avoid app continue to send msg on invalid handle
  SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
  exh->refId = uvAddExHandle(exh);
  uvAcquireExHandle(exh->refId);
  conn->refId = exh->refId;

  return 0;
}
dengyihao's avatar
dengyihao 已提交
804
static void uvDestroyConn(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
805 806 807 808
  SSrvConn* conn = handle->data;
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
809 810
  SWorkThrdObj* thrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
811 812 813
  uvReleaseExHandle(conn->refId);
  uvRemoveExHandle(conn->refId);

dengyihao's avatar
dengyihao 已提交
814
  tDebug("server conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
815
  // uv_timer_stop(&conn->pTimer);
dengyihao's avatar
dengyihao 已提交
816
  transQueueDestroy(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
817 818 819 820 821

  if (conn->regArg.init == 1) {
    transFreeMsg(conn->regArg.msg.pCont);
    conn->regArg.init = 0;
  }
dengyihao's avatar
dengyihao 已提交
822
  QUEUE_REMOVE(&conn->queue);
wafwerar's avatar
wafwerar 已提交
823
  taosMemoryFree(conn->pTcp);
D
dapan1121 已提交
824 825 826 827
  if (conn->regArg.init == 1) {
    transFreeMsg(conn->regArg.msg.pCont);
    conn->regArg.init = 0;
  }
dengyihao's avatar
dengyihao 已提交
828
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
829

dengyihao's avatar
dengyihao 已提交
830
  if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
831
    tTrace("work thread quit");
dengyihao's avatar
dengyihao 已提交
832
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
833
  }
dengyihao's avatar
dengyihao 已提交
834
}
wafwerar's avatar
wafwerar 已提交
835 836 837 838
static void uvPipeListenCb(uv_stream_t* handle, int status) {
  ASSERT(status == 0);

  SServerObj* srv = container_of(handle, SServerObj, pipeListen);
dengyihao's avatar
dengyihao 已提交
839
  uv_pipe_t*  pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
wafwerar's avatar
wafwerar 已提交
840 841 842 843 844 845 846 847 848 849 850 851 852 853
  ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1));
  ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe));

  ASSERT(1 == uv_is_readable((uv_stream_t*)pipe));
  ASSERT(1 == uv_is_writable((uv_stream_t*)pipe));
  ASSERT(0 == uv_is_closing((uv_handle_t*)pipe));

  srv->numOfWorkerReady++;

  // ASSERT(0 == uv_listen((uv_stream_t*)&ctx.send.tcp, 512, uvOnAcceptCb));

  // r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, read_cb);
  // ASSERT(r == 0);
}
dengyihao's avatar
dengyihao 已提交
854

U
ubuntu 已提交
855
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
856 857
  SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
858 859
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
wafwerar's avatar
wafwerar 已提交
860
  srv->numOfWorkerReady = 0;
wafwerar's avatar
wafwerar 已提交
861 862
  srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
dengyihao's avatar
dengyihao 已提交
863 864 865 866
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

dengyihao's avatar
dengyihao 已提交
867
  taosThreadOnce(&transModuleInit, uvInitEnv);
dengyihao's avatar
dengyihao 已提交
868
  transSrvInst++;
dengyihao's avatar
dengyihao 已提交
869

wafwerar's avatar
wafwerar 已提交
870 871
  assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0));
#ifdef WINDOWS
872 873
  char pipeName[64];
  snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId());
wafwerar's avatar
wafwerar 已提交
874
#else
875
  char pipeName[PATH_MAX] = {0};
dengyihao's avatar
dengyihao 已提交
876 877
  snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(),
           taosGetSelfPthreadId());
wafwerar's avatar
wafwerar 已提交
878 879 880 881
#endif
  assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName));
  assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));

dengyihao's avatar
dengyihao 已提交
882
  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
883
    SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
884
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
885
    thrd->quit = false;
dengyihao's avatar
dengyihao 已提交
886
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
887
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
888

wafwerar's avatar
wafwerar 已提交
889
    srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
dengyihao's avatar
dengyihao 已提交
890
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
891

dengyihao's avatar
dengyihao 已提交
892
    if (false == addHandleToWorkloop(thrd, pipeName)) {
dengyihao's avatar
dengyihao 已提交
893 894
      goto End;
    }
895
    int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
dengyihao's avatar
dengyihao 已提交
896 897 898 899 900 901
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
dengyihao's avatar
dengyihao 已提交
902
      goto End;
dengyihao's avatar
dengyihao 已提交
903 904
    }
  }
905
  if (false == taosValidIpAndPort(srv->ip, srv->port)) {
dengyihao's avatar
dengyihao 已提交
906 907
    terrno = TAOS_SYSTEM_ERROR(errno);
    tError("invalid ip/port, reason: %s", terrstr());
908 909
    goto End;
  }
dengyihao's avatar
dengyihao 已提交
910 911 912
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
913
  int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
dengyihao's avatar
dengyihao 已提交
914 915 916
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
dengyihao's avatar
dengyihao 已提交
917 918
    tError("failed  to create accept-thread");
    goto End;
dengyihao's avatar
dengyihao 已提交
919 920
    // clear all resource later
  }
dengyihao's avatar
dengyihao 已提交
921
  srv->inited = true;
dengyihao's avatar
dengyihao 已提交
922
  return srv;
dengyihao's avatar
dengyihao 已提交
923
End:
U
ubuntu 已提交
924
  transCloseServer(srv);
dengyihao's avatar
dengyihao 已提交
925 926
  return NULL;
}
dengyihao's avatar
dengyihao 已提交
927

dengyihao's avatar
dengyihao 已提交
928 929
void uvInitEnv() {
  uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
dengyihao's avatar
dengyihao 已提交
930 931
  uvOpenExHandleMgt(10000);
}
dengyihao's avatar
dengyihao 已提交
932 933 934 935
void uvOpenExHandleMgt(int size) {
  // added into once later
  exHandlesMgt = taosOpenRef(size, uvDestoryExHandle);
}
dengyihao's avatar
dengyihao 已提交
936 937 938 939
void uvCloseExHandleMgt() {
  // close ref
  taosCloseRef(exHandlesMgt);
}
dengyihao's avatar
dengyihao 已提交
940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964
int64_t uvAddExHandle(void* p) {
  // acquire extern handle
  return taosAddRef(exHandlesMgt, p);
}
int32_t uvRemoveExHandle(int64_t refId) {
  // acquire extern handle
  return taosRemoveRef(exHandlesMgt, refId);
}

SExHandle* uvAcquireExHandle(int64_t refId) {
  // acquire extern handle
  return (SExHandle*)taosAcquireRef(exHandlesMgt, refId);
}

int32_t uvReleaseExHandle(int64_t refId) {
  // release extern handle
  return taosReleaseRef(exHandlesMgt, refId);
}
void uvDestoryExHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  taosMemoryFree(handle);
}

dengyihao's avatar
dengyihao 已提交
965
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
966
  thrd->quit = true;
dengyihao's avatar
dengyihao 已提交
967
  if (QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
968
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
969 970 971
  } else {
    destroyAllConn(thrd);
  }
wafwerar's avatar
wafwerar 已提交
972
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
973 974 975
}
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
dengyihao's avatar
dengyihao 已提交
976
  reallocConnRefHandle(conn);
dengyihao's avatar
dengyihao 已提交
977
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
978
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
979
      return;
dengyihao's avatar
dengyihao 已提交
980 981 982
    }
    uvStartSendRespInternal(msg);
    return;
dengyihao's avatar
dengyihao 已提交
983 984
  } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
    tDebug("server conn %p already released, ignore release-msg", conn);
dengyihao's avatar
dengyihao 已提交
985
  }
dengyihao's avatar
dengyihao 已提交
986
  destroySmsg(msg);
dengyihao's avatar
dengyihao 已提交
987
}
dengyihao's avatar
dengyihao 已提交
988
void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
989
  // send msg to client
dengyihao's avatar
dengyihao 已提交
990
  tDebug("server conn %p start to send resp (2/2)", msg->pConn);
dengyihao's avatar
dengyihao 已提交
991 992
  uvStartSendResp(msg);
}
dengyihao's avatar
dengyihao 已提交
993 994
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
dengyihao's avatar
dengyihao 已提交
995
  tDebug("server conn %p register brokenlink callback", conn);
dengyihao's avatar
dengyihao 已提交
996
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
997
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
998 999
      return;
    }
dengyihao's avatar
dengyihao 已提交
1000
    transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
1001 1002 1003
    conn->regArg.notifyCount = 0;
    conn->regArg.init = 1;
    conn->regArg.msg = msg->msg;
dengyihao's avatar
dengyihao 已提交
1004
    tDebug("server conn %p register brokenlink callback succ", conn);
dengyihao's avatar
dengyihao 已提交
1005 1006 1007 1008 1009 1010

    if (conn->broken) {
      STrans* pTransInst = conn->pTransInst;
      (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
      memset(&conn->regArg, 0, sizeof(conn->regArg));
    }
wafwerar's avatar
wafwerar 已提交
1011
    taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
1012 1013
  }
}
dengyihao's avatar
dengyihao 已提交
1014 1015 1016 1017
void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
1018
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
1019
  SRV_RELEASE_UV(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
1020
  transDestroyAsyncPool(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
1021
  taosMemoryFree(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
1022
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
1023
}
dengyihao's avatar
dengyihao 已提交
1024
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
wafwerar's avatar
wafwerar 已提交
1025
  SSrvMsg* msg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1026
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
1027
  tDebug("server send quit msg to work thread");
dengyihao's avatar
dengyihao 已提交
1028
  transSendAsync(pThrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
1029 1030
}

U
ubuntu 已提交
1031
void transCloseServer(void* arg) {
dengyihao's avatar
dengyihao 已提交
1032 1033
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
1034 1035

  tDebug("send quit msg to accept thread");
dengyihao's avatar
dengyihao 已提交
1036 1037 1038 1039
  if (srv->inited) {
    uv_async_send(srv->pAcceptAsync);
    taosThreadJoin(srv->thread, NULL);
  }
dengyihao's avatar
dengyihao 已提交
1040
  SRV_RELEASE_UV(srv->loop);
dengyihao's avatar
dengyihao 已提交
1041

dengyihao's avatar
dengyihao 已提交
1042 1043 1044 1045 1046
  for (int i = 0; i < srv->numOfThreads; i++) {
    sendQuitToWorkThrd(srv->pThreadObj[i]);
    destroyWorkThrd(srv->pThreadObj[i]);
  }

wafwerar's avatar
wafwerar 已提交
1047 1048 1049
  taosMemoryFree(srv->pThreadObj);
  taosMemoryFree(srv->pAcceptAsync);
  taosMemoryFree(srv->loop);
dengyihao's avatar
dengyihao 已提交
1050 1051

  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
1052
    taosMemoryFree(srv->pipe[i]);
dengyihao's avatar
dengyihao 已提交
1053
  }
wafwerar's avatar
wafwerar 已提交
1054
  taosMemoryFree(srv->pipe);
dengyihao's avatar
dengyihao 已提交
1055

wafwerar's avatar
wafwerar 已提交
1056
  taosMemoryFree(srv);
dengyihao's avatar
dengyihao 已提交
1057

dengyihao's avatar
dengyihao 已提交
1058 1059
  transSrvInst--;
  if (transSrvInst == 0) {
1060 1061
    TdThreadOnce tmpInit = PTHREAD_ONCE_INIT;
    memcpy(&transModuleInit, &tmpInit, sizeof(TdThreadOnce));
dengyihao's avatar
dengyihao 已提交
1062 1063
    uvCloseExHandleMgt();
  }
dengyihao's avatar
dengyihao 已提交
1064
}
dengyihao's avatar
dengyihao 已提交
1065

dengyihao's avatar
dengyihao 已提交
1066 1067 1068 1069 1070
void transRefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
1071
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
1072 1073 1074 1075 1076 1077 1078
}

void transUnrefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
1079
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
1080 1081 1082 1083
  if (ref == 0) {
    destroyConn((SSrvConn*)handle, true);
  }
}
dengyihao's avatar
dengyihao 已提交
1084 1085

void transReleaseSrvHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
1086
  SExHandle* exh = handle;
dengyihao's avatar
dengyihao 已提交
1087 1088
  int64_t    refId = exh->refId;

dengyihao's avatar
dengyihao 已提交
1089
  ASYNC_CHECK_HANDLE(exh, refId);
dengyihao's avatar
dengyihao 已提交
1090

dengyihao's avatar
dengyihao 已提交
1091 1092 1093
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);

dengyihao's avatar
dengyihao 已提交
1094
  STransMsg tmsg = {.code = 0, .handle = exh, .ahandle = NULL, .refId = refId};
dengyihao's avatar
dengyihao 已提交
1095

wafwerar's avatar
wafwerar 已提交
1096
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1097 1098 1099
  srvMsg->msg = tmsg;
  srvMsg->type = Release;

dengyihao's avatar
dengyihao 已提交
1100
  tTrace("server conn %p start to release", exh->handle);
dengyihao's avatar
dengyihao 已提交
1101
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
1102 1103
  uvReleaseExHandle(refId);
  return;
dengyihao's avatar
dengyihao 已提交
1104
_return1:
dengyihao's avatar
dengyihao 已提交
1105
  tTrace("server handle %p failed to send to release handle", exh);
dengyihao's avatar
dengyihao 已提交
1106
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
1107 1108
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1109
  tTrace("server handle %p failed to send to release handle", exh);
dengyihao's avatar
dengyihao 已提交
1110
  return;
dengyihao's avatar
dengyihao 已提交
1111
}
dengyihao's avatar
dengyihao 已提交
1112 1113 1114 1115
void transSendResponse(const STransMsg* msg) {
  SExHandle* exh = msg->handle;
  int64_t    refId = msg->refId;
  ASYNC_CHECK_HANDLE(exh, refId);
dengyihao's avatar
dengyihao 已提交
1116
  assert(refId != 0);
dengyihao's avatar
dengyihao 已提交
1117

dengyihao's avatar
dengyihao 已提交
1118 1119 1120
  STransMsg tmsg = *msg;
  tmsg.refId = refId;

dengyihao's avatar
dengyihao 已提交
1121 1122
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);
dengyihao's avatar
dengyihao 已提交
1123

wafwerar's avatar
wafwerar 已提交
1124
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1125
  srvMsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
1126
  srvMsg->type = Normal;
1127
  tDebug("server conn %p start to send resp (1/2)", exh->handle);
dengyihao's avatar
dengyihao 已提交
1128
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
1129 1130
  uvReleaseExHandle(refId);
  return;
dengyihao's avatar
dengyihao 已提交
1131
_return1:
dengyihao's avatar
dengyihao 已提交
1132
  tTrace("server handle %p failed to send resp", exh);
dengyihao's avatar
dengyihao 已提交
1133
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1134
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
1135 1136
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1137
  tTrace("server handle %p failed to send resp", exh);
dengyihao's avatar
dengyihao 已提交
1138 1139
  rpcFreeCont(msg->pCont);
  return;
dengyihao's avatar
dengyihao 已提交
1140
}
dengyihao's avatar
dengyihao 已提交
1141
void transRegisterMsg(const STransMsg* msg) {
dengyihao's avatar
dengyihao 已提交
1142
  SExHandle* exh = msg->handle;
dengyihao's avatar
dengyihao 已提交
1143 1144 1145
  int64_t    refId = msg->refId;
  ASYNC_CHECK_HANDLE(exh, refId);

dengyihao's avatar
dengyihao 已提交
1146 1147 1148
  STransMsg tmsg = *msg;
  tmsg.refId = refId;

dengyihao's avatar
dengyihao 已提交
1149 1150
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);
dengyihao's avatar
dengyihao 已提交
1151

wafwerar's avatar
wafwerar 已提交
1152
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1153
  srvMsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
1154
  srvMsg->type = Register;
dengyihao's avatar
dengyihao 已提交
1155
  tTrace("server conn %p start to register brokenlink callback", exh->handle);
dengyihao's avatar
dengyihao 已提交
1156
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
1157 1158
  uvReleaseExHandle(refId);
  return;
dengyihao's avatar
dengyihao 已提交
1159

dengyihao's avatar
dengyihao 已提交
1160
_return1:
dengyihao's avatar
dengyihao 已提交
1161
  tTrace("server handle %p failed to send to register brokenlink", exh);
dengyihao's avatar
dengyihao 已提交
1162
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1163
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
1164 1165
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1166
  tTrace("server handle %p failed to send to register brokenlink", exh);
dengyihao's avatar
dengyihao 已提交
1167
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1168
}
dengyihao's avatar
dengyihao 已提交
1169

dengyihao's avatar
formate  
dengyihao 已提交
1170
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
dengyihao's avatar
dengyihao 已提交
1171 1172 1173 1174 1175 1176
  if (thandle == NULL) {
    tTrace("invalid handle %p, failed to Get Conn info", thandle);
    return -1;
  }
  SExHandle* ex = thandle;
  SSrvConn*  pConn = ex->handle;
U
ubuntu 已提交
1177

dengyihao's avatar
dengyihao 已提交
1178
  struct sockaddr_in addr = pConn->addr;
dengyihao's avatar
dengyihao 已提交
1179 1180
  pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(addr.sin_port);
dengyihao's avatar
dengyihao 已提交
1181 1182 1183 1184
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
1185
#endif