transSrv.c 37.9 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20 21 22
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;

static char* notify = "a";
dengyihao's avatar
dengyihao 已提交
23
static int   transSrvInst = 0;
dengyihao's avatar
dengyihao 已提交
24

dengyihao's avatar
dengyihao 已提交
25 26 27 28 29 30
typedef struct {
  int       notifyCount;  //
  int       init;         // init or not
  STransMsg msg;
} SSrvRegArg;

dengyihao's avatar
dengyihao 已提交
31
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
32
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
33 34 35
  uv_tcp_t*  pTcp;
  uv_write_t pWriter;
  uv_timer_t pTimer;
dengyihao's avatar
dengyihao 已提交
36 37

  queue       queue;
dengyihao's avatar
dengyihao 已提交
38
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
39 40 41
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
42
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
43
  STransQueue srvMsgs;
dengyihao's avatar
dengyihao 已提交
44

dengyihao's avatar
dengyihao 已提交
45 46
  SSrvRegArg regArg;
  bool       broken;  // conn broken;
dengyihao's avatar
dengyihao 已提交
47

dengyihao's avatar
dengyihao 已提交
48
  ConnStatus         status;
dengyihao's avatar
dengyihao 已提交
49
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
50
  struct sockaddr_in locaddr;
U
ubuntu 已提交
51

dengyihao's avatar
dengyihao 已提交
52 53 54 55 56 57
  int64_t refId;
  int     spi;
  char    info[64];
  char    user[TSDB_UNI_LEN];  // user ID for the link
  char    secret[TSDB_PASSWORD_LEN];
  char    ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
58 59 60
} SSrvConn;

typedef struct SSrvMsg {
dengyihao's avatar
dengyihao 已提交
61 62 63 64
  SSrvConn*     pConn;
  STransMsg     msg;
  queue         q;
  STransMsgType type;
dengyihao's avatar
dengyihao 已提交
65
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
66 67

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
68
  TdThread      thread;
wafwerar's avatar
wafwerar 已提交
69
  uv_connect_t  connect_req;
dengyihao's avatar
dengyihao 已提交
70 71 72 73 74
  uv_pipe_t*    pipe;
  uv_os_fd_t    fd;
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  queue         msg;
wafwerar's avatar
wafwerar 已提交
75
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
76 77 78 79

  queue conn;
  void* pTransInst;
  bool  quit;
dengyihao's avatar
dengyihao 已提交
80 81 82
} SWorkThrdObj;

typedef struct SServerObj {
dengyihao's avatar
dengyihao 已提交
83
  TdThread   thread;
dengyihao's avatar
dengyihao 已提交
84 85 86 87
  uv_tcp_t   server;
  uv_loop_t* loop;

  // work thread info
dengyihao's avatar
dengyihao 已提交
88 89
  int            workerIdx;
  int            numOfThreads;
wafwerar's avatar
wafwerar 已提交
90
  int            numOfWorkerReady;
dengyihao's avatar
dengyihao 已提交
91
  SWorkThrdObj** pThreadObj;
dengyihao's avatar
dengyihao 已提交
92

wafwerar's avatar
wafwerar 已提交
93
  uv_pipe_t   pipeListen;
dengyihao's avatar
dengyihao 已提交
94 95 96 97
  uv_pipe_t** pipe;
  uint32_t    ip;
  uint32_t    port;
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
98 99

  bool inited;
dengyihao's avatar
dengyihao 已提交
100 101
} SServerObj;

dengyihao's avatar
dengyihao 已提交
102 103 104 105 106 107 108
// handle
typedef struct SExHandle {
  void*         handle;
  int64_t       refId;
  SWorkThrdObj* pThrd;
} SExHandle;

dengyihao's avatar
dengyihao 已提交
109
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
110
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
111
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
112
static void uvOnTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
113
static void uvOnSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
114
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
115 116 117
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
118
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
119
static void uvShutDownCb(uv_shutdown_t* req, int status);
dengyihao's avatar
dengyihao 已提交
120 121 122 123 124 125 126

/*
 * time-consuming task throwed into BG work thread
 */
static void uvWorkDoTask(uv_work_t* req);
static void uvWorkAfterTask(uv_work_t* req, int status);

dengyihao's avatar
dengyihao 已提交
127 128
static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
129

dengyihao's avatar
dengyihao 已提交
130
static void uvStartSendRespInternal(SSrvMsg* smsg);
dengyihao's avatar
dengyihao 已提交
131 132
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
133

dengyihao's avatar
dengyihao 已提交
134 135
static void uvNotifyLinkBrokenToApp(SSrvConn* conn);

dengyihao's avatar
dengyihao 已提交
136
static void destroySmsg(SSrvMsg* smsg);
137
// check whether already read complete packet
dengyihao's avatar
fix bug  
dengyihao 已提交
138
static SSrvConn* createConn(void* hThrd);
dengyihao's avatar
dengyihao 已提交
139
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
140
static int       reallocConnRefHandle(SSrvConn* conn);
dengyihao's avatar
dengyihao 已提交
141

dengyihao's avatar
dengyihao 已提交
142 143
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
144
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
145
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
146 147
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
                                                                       uvHandleRegister};
dengyihao's avatar
dengyihao 已提交
148

dengyihao's avatar
dengyihao 已提交
149
static int32_t exHandlesMgt;
dengyihao's avatar
dengyihao 已提交
150

dengyihao's avatar
dengyihao 已提交
151
void       uvInitEnv();
dengyihao's avatar
dengyihao 已提交
152
void       uvOpenExHandleMgt(int size);
dengyihao's avatar
dengyihao 已提交
153
void       uvCloseExHandleMgt();
dengyihao's avatar
dengyihao 已提交
154 155 156 157 158 159
int64_t    uvAddExHandle(void* p);
int32_t    uvRemoveExHandle(int64_t refId);
int32_t    uvReleaseExHandle(int64_t refId);
void       uvDestoryExHandle(void* handle);
SExHandle* uvAcquireExHandle(int64_t refId);

160
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
161

dengyihao's avatar
dengyihao 已提交
162
// server and worker thread
163 164
static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
dengyihao's avatar
dengyihao 已提交
165

dengyihao's avatar
dengyihao 已提交
166
// add handle loop
dengyihao's avatar
dengyihao 已提交
167
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName);
dengyihao's avatar
dengyihao 已提交
168 169
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
#define CONN_SHOULD_RELEASE(conn, head)                                     \
  do {                                                                      \
    if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) {          \
      conn->status = ConnRelease;                                           \
      transClearBuffer(&conn->readBuf);                                     \
      transFreeMsg(transContFromHead((char*)head));                         \
      tTrace("server conn %p received release request", conn);              \
                                                                            \
      STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
      SSrvMsg*  srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));              \
      srvMsg->msg = tmsg;                                                   \
      srvMsg->type = Release;                                               \
      srvMsg->pConn = conn;                                                 \
      if (!transQueuePush(&conn->srvMsgs, srvMsg)) {                        \
        return;                                                             \
      }                                                                     \
      uvStartSendRespInternal(srvMsg);                                      \
      return;                                                               \
    }                                                                       \
  } while (0)

#define SRV_RELEASE_UV(loop)       \
  do {                             \
    uv_walk(loop, uvWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);  \
    uv_loop_close(loop);           \
  } while (0);

dengyihao's avatar
dengyihao 已提交
198 199 200 201
#define ASYNC_ERR_JRET(thrd)                            \
  do {                                                  \
    if (thrd->quit) {                                   \
      tTrace("worker thread already quit, ignore msg"); \
dengyihao's avatar
dengyihao 已提交
202
      goto _return1;                                    \
dengyihao's avatar
dengyihao 已提交
203 204 205
    }                                                   \
  } while (0)

dengyihao's avatar
dengyihao 已提交
206 207 208 209 210
#define ASYNC_CHECK_HANDLE(exh1, refId)                                                                               \
  do {                                                                                                                \
    if (refId > 0) {                                                                                                  \
      tTrace("server handle step1");                                                                                  \
      SExHandle* exh2 = uvAcquireExHandle(refId);                                                                     \
dengyihao's avatar
dengyihao 已提交
211
      if (exh2 == NULL || refId != exh2->refId) {                                                                     \
dengyihao's avatar
dengyihao 已提交
212
        tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
dengyihao's avatar
dengyihao 已提交
213
               exh2 ? exh2->refId : 0, refId);                                                                        \
dengyihao's avatar
dengyihao 已提交
214 215 216
        goto _return1;                                                                                                \
      }                                                                                                               \
    } else if (refId == 0) {                                                                                          \
dengyihao's avatar
dengyihao 已提交
217
      tTrace("server handle step2");                                                                                  \
dengyihao's avatar
dengyihao 已提交
218
      SExHandle* exh2 = uvAcquireExHandle(refId);                                                                     \
dengyihao's avatar
dengyihao 已提交
219 220 221
      if (exh2 == NULL || refId != exh2->refId) {                                                                     \
        tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
               refId, exh2 ? exh2->refId : 0);                                                                        \
dengyihao's avatar
dengyihao 已提交
222 223 224 225 226
        goto _return1;                                                                                                \
      } else {                                                                                                        \
        refId = exh1->refId;                                                                                          \
      }                                                                                                               \
    } else if (refId == -1) {                                                                                         \
dengyihao's avatar
dengyihao 已提交
227
      tTrace("server handle step3");                                                                                  \
dengyihao's avatar
dengyihao 已提交
228 229
      goto _return2;                                                                                                  \
    }                                                                                                                 \
dengyihao's avatar
dengyihao 已提交
230 231
  } while (0)

dengyihao's avatar
dengyihao 已提交
232
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
233 234 235
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
236 237 238 239
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
240
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
241
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
242 243
}

dengyihao's avatar
dengyihao 已提交
244 245
static void uvHandleReq(SSrvConn* pConn) {
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
246 247 248 249
  char*        msg = pBuf->buf;
  uint32_t     msgLen = pBuf->len;

  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
250
  pHead->code = htonl(pHead->code);
dengyihao's avatar
dengyihao 已提交
251
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
252
  memcpy(pConn->user, pHead->user, strlen(pHead->user));
dengyihao's avatar
dengyihao 已提交
253

dengyihao's avatar
dengyihao 已提交
254 255 256 257 258 259 260
  // TODO(dengyihao): time-consuming task throwed into BG Thread
  //  uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
  //  wreq->data = pConn;
  //  uv_read_stop((uv_stream_t*)pConn->pTcp);
  //  transRefSrvHandle(pConn);
  //  uv_queue_work(((SWorkThrdObj*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);

dengyihao's avatar
dengyihao 已提交
261
  CONN_SHOULD_RELEASE(pConn, pHead);
dengyihao's avatar
dengyihao 已提交
262

dengyihao's avatar
dengyihao 已提交
263 264 265 266 267
  STransMsg transMsg;
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = pHead->content;
  transMsg.msgType = pHead->msgType;
  transMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
268
  transMsg.ahandle = (void*)pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
269
  transMsg.handle = NULL;
dengyihao's avatar
dengyihao 已提交
270

dengyihao's avatar
dengyihao 已提交
271
  // transDestroyBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
272
  transClearBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
273
  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
274 275 276 277
  if (pConn->status == ConnNormal) {
    if (pHead->persist == 1) {
      pConn->status = ConnAcquire;
      transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
278
      tDebug("server conn %p acquired by server app", pConn);
dengyihao's avatar
dengyihao 已提交
279 280 281
    }
  }
  if (pConn->status == ConnNormal && pHead->noResp == 0) {
dengyihao's avatar
dengyihao 已提交
282
    transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
283 284 285
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
           taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
           ntohs(pConn->locaddr.sin_port), transMsg.contLen);
dengyihao's avatar
dengyihao 已提交
286
  } else {
dengyihao's avatar
dengyihao 已提交
287 288 289
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
           TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
           taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
dengyihao's avatar
dengyihao 已提交
290 291 292
    // no ref here
  }

dengyihao's avatar
dengyihao 已提交
293 294 295 296
  // if pHead->noResp = 1,
  // 1. server application should not send resp on handle
  // 2. once send out data, cli conn released to conn pool immediately
  // 3. not mixed with persist
dengyihao's avatar
dengyihao 已提交
297

dengyihao's avatar
dengyihao 已提交
298
  transMsg.handle = (void*)uvAcquireExHandle(pConn->refId);
dengyihao's avatar
dengyihao 已提交
299 300 301
  tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.handle, pConn, pConn->refId);
  transMsg.refId = pConn->refId;
  assert(transMsg.handle != NULL);
dengyihao's avatar
dengyihao 已提交
302
  if (pHead->noResp == 1) {
dengyihao's avatar
dengyihao 已提交
303
    transMsg.refId = -1;
dengyihao's avatar
dengyihao 已提交
304 305
  }
  uvReleaseExHandle(pConn->refId);
dengyihao's avatar
dengyihao 已提交
306

dengyihao's avatar
dengyihao 已提交
307
  STrans* pTransInst = pConn->pTransInst;
dengyihao's avatar
dengyihao 已提交
308
  (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
dengyihao's avatar
dengyihao 已提交
309
  // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
310 311
}

dengyihao's avatar
dengyihao 已提交
312
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
313
  // opt
dengyihao's avatar
dengyihao 已提交
314 315
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
316 317
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
318
    tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
319
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
320
      tTrace("server conn %p alread read complete packet", conn);
dengyihao's avatar
dengyihao 已提交
321
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
322
    } else {
dengyihao's avatar
dengyihao 已提交
323
      tTrace("server %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
324 325 326
    }
    return;
  }
327 328 329
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
330 331

  tError("server conn %p read error: %s", conn, uv_err_name(nread));
U
ubuntu 已提交
332
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
333
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
334 335
    if (conn->status == ConnAcquire) {
      if (conn->regArg.init) {
dengyihao's avatar
dengyihao 已提交
336
        tTrace("server conn %p broken, notify server app", conn);
dengyihao's avatar
dengyihao 已提交
337 338 339 340 341
        STrans* pTransInst = conn->pTransInst;
        (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
        memset(&conn->regArg, 0, sizeof(conn->regArg));
      }
    }
dengyihao's avatar
dengyihao 已提交
342
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
343 344 345 346
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->len = 2;
wafwerar's avatar
wafwerar 已提交
347
  buf->base = taosMemoryCalloc(1, sizeof(char) * buf->len);
dengyihao's avatar
dengyihao 已提交
348 349 350 351
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
352
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
353
  tError("server conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
354 355
}

dengyihao's avatar
dengyihao 已提交
356
void uvOnSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
357
  SSrvConn* conn = req->data;
dengyihao's avatar
dengyihao 已提交
358
  // transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
359
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
360
    tTrace("server conn %p data already was written on stream", conn);
dengyihao's avatar
dengyihao 已提交
361 362
    if (!transQueueEmpty(&conn->srvMsgs)) {
      SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
363 364 365 366 367 368 369 370
      // if (msg->type == Release && conn->status != ConnNormal) {
      //  conn->status = ConnNormal;
      //  transUnrefSrvHandle(conn);
      //  reallocConnRefHandle(conn);
      //  destroySmsg(msg);
      //  transQueueClear(&conn->srvMsgs);
      //  return;
      //}
dengyihao's avatar
add UT  
dengyihao 已提交
371 372
      destroySmsg(msg);
      // send second data, just use for push
dengyihao's avatar
dengyihao 已提交
373
      if (!transQueueEmpty(&conn->srvMsgs)) {
dengyihao's avatar
dengyihao 已提交
374
        msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
375 376 377 378 379 380 381 382 383
        if (msg->type == Register && conn->status == ConnAcquire) {
          conn->regArg.notifyCount = 0;
          conn->regArg.init = 1;
          conn->regArg.msg = msg->msg;
          if (conn->broken) {
            STrans* pTransInst = conn->pTransInst;
            (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
            memset(&conn->regArg, 0, sizeof(conn->regArg));
          }
dengyihao's avatar
dengyihao 已提交
384
          transQueuePop(&conn->srvMsgs);
wafwerar's avatar
wafwerar 已提交
385
          taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
386 387

          msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
388 389 390
          if (msg != NULL) {
            uvStartSendRespInternal(msg);
          }
dengyihao's avatar
dengyihao 已提交
391 392 393
        } else {
          uvStartSendRespInternal(msg);
        }
dengyihao's avatar
add UT  
dengyihao 已提交
394
      }
dengyihao's avatar
dengyihao 已提交
395
    }
dengyihao's avatar
dengyihao 已提交
396
  } else {
dengyihao's avatar
dengyihao 已提交
397
    tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
398
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
399
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
400 401
  }
}
dengyihao's avatar
dengyihao 已提交
402 403
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
404
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
405 406 407
  } else {
    tError("fail to dispatch conn to work thread");
  }
dengyihao's avatar
dengyihao 已提交
408 409
  uv_close((uv_handle_t*)req->data, uvFreeCb);
  // taosMemoryFree(req->data);
wafwerar's avatar
wafwerar 已提交
410
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
411
}
dengyihao's avatar
dengyihao 已提交
412

dengyihao's avatar
dengyihao 已提交
413
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
dengyihao's avatar
dengyihao 已提交
414
  tTrace("server conn %p prepare to send resp", smsg->pConn);
dengyihao's avatar
dengyihao 已提交
415

dengyihao's avatar
formate  
dengyihao 已提交
416 417
  SSrvConn*  pConn = smsg->pConn;
  STransMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
418 419 420 421 422
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
423
  pHead->ahandle = (uint64_t)pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
424

dengyihao's avatar
dengyihao 已提交
425 426 427
  if (pConn->status == ConnNormal) {
    pHead->msgType = pConn->inType + 1;
  } else {
dengyihao's avatar
dengyihao 已提交
428 429 430 431 432 433 434
    if (smsg->type == Release) {
      pHead->msgType = 0;
      pConn->status = ConnNormal;
      transUnrefSrvHandle(pConn);
    } else {
      pHead->msgType = pMsg->msgType;
    }
dengyihao's avatar
dengyihao 已提交
435
  }
dengyihao's avatar
dengyihao 已提交
436

dengyihao's avatar
dengyihao 已提交
437
  pHead->release = smsg->type == Release ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
438
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
439

dengyihao's avatar
dengyihao 已提交
440 441
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
442 443 444
  tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
         taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
         ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
445
  pHead->msgLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
446

dengyihao's avatar
dengyihao 已提交
447 448 449
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
450 451

static void uvStartSendRespInternal(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
452 453 454 455
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
456
  // uv_timer_stop(&pConn->pTimer);
dengyihao's avatar
dengyihao 已提交
457
  uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
dengyihao's avatar
dengyihao 已提交
458 459 460 461
}
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
462 463

  if (pConn->broken == true) {
dengyihao's avatar
dengyihao 已提交
464
    // persist by
dengyihao's avatar
dengyihao 已提交
465 466 467
    transUnrefSrvHandle(pConn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
468 469 470
  if (pConn->status == ConnNormal) {
    transUnrefSrvHandle(pConn);
  }
dengyihao's avatar
dengyihao 已提交
471

dengyihao's avatar
dengyihao 已提交
472
  if (!transQueuePush(&pConn->srvMsgs, smsg)) {
dengyihao's avatar
dengyihao 已提交
473 474 475
    return;
  }
  uvStartSendRespInternal(smsg);
dengyihao's avatar
dengyihao 已提交
476 477
  return;
}
dengyihao's avatar
dengyihao 已提交
478

dengyihao's avatar
dengyihao 已提交
479
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
480 481 482 483
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
wafwerar's avatar
wafwerar 已提交
484
  taosMemoryFree(smsg);
dengyihao's avatar
dengyihao 已提交
485
}
dengyihao's avatar
fix bug  
dengyihao 已提交
486
static void destroyAllConn(SWorkThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
487
  tTrace("thread %p destroy all conn ", pThrd);
dengyihao's avatar
fix bug  
dengyihao 已提交
488 489 490 491 492 493
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
    queue* h = QUEUE_HEAD(&pThrd->conn);
    QUEUE_REMOVE(h);
    QUEUE_INIT(h);

    SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
dengyihao's avatar
dengyihao 已提交
494 495 496
    while (T_REF_VAL_GET(c) >= 2) {
      transUnrefSrvHandle(c);
    }
dengyihao's avatar
dengyihao 已提交
497
    transUnrefSrvHandle(c);
dengyihao's avatar
fix bug  
dengyihao 已提交
498 499
  }
}
dengyihao's avatar
dengyihao 已提交
500
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
501 502
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
503
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
504
  queue         wq;
dengyihao's avatar
dengyihao 已提交
505

dengyihao's avatar
dengyihao 已提交
506
  // batch process to avoid to lock/unlock frequently
wafwerar's avatar
wafwerar 已提交
507
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
508
  QUEUE_MOVE(&item->qmsg, &wq);
wafwerar's avatar
wafwerar 已提交
509
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
510 511 512 513

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
514 515 516

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
517
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
518
      continue;
dengyihao's avatar
dengyihao 已提交
519
    }
dengyihao's avatar
dengyihao 已提交
520
    // release handle to rpc init
dengyihao's avatar
dengyihao 已提交
521 522
    if (msg->type == Quit) {
      (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
523
      continue;
dengyihao's avatar
dengyihao 已提交
524 525 526 527 528 529 530
    } else {
      STransMsg transMsg = msg->msg;

      SExHandle* exh1 = transMsg.handle;
      int64_t    refId = transMsg.refId;
      SExHandle* exh2 = uvAcquireExHandle(refId);
      if (exh2 == NULL || exh1 != exh2) {
dengyihao's avatar
dengyihao 已提交
531
        tTrace("server handle except msg %p, ignore it", exh1);
dengyihao's avatar
dengyihao 已提交
532 533 534 535 536
        uvReleaseExHandle(refId);
        destroySmsg(msg);
        continue;
      }
      msg->pConn = exh1->handle;
dengyihao's avatar
dengyihao 已提交
537
      uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
538
      (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
539
    }
dengyihao's avatar
dengyihao 已提交
540 541
  }
}
dengyihao's avatar
dengyihao 已提交
542 543 544 545 546
static void uvWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}
dengyihao's avatar
dengyihao 已提交
547 548 549 550
static void uvFreeCb(uv_handle_t* handle) {
  //
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
551

dengyihao's avatar
dengyihao 已提交
552 553
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
dengyihao's avatar
dengyihao 已提交
554
  tDebug("close server port %d", srv->port);
dengyihao's avatar
dengyihao 已提交
555
  uv_walk(srv->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
556
}
dengyihao's avatar
dengyihao 已提交
557

dengyihao's avatar
dengyihao 已提交
558
static void uvShutDownCb(uv_shutdown_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
559 560 561
  if (status != 0) {
    tDebug("conn failed to shut down: %s", uv_err_name(status));
  }
dengyihao's avatar
dengyihao 已提交
562
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
wafwerar's avatar
wafwerar 已提交
563
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
564 565
}

dengyihao's avatar
dengyihao 已提交
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
static void uvWorkDoTask(uv_work_t* req) {
  // doing time-consumeing task
  // only auth conn currently, add more func later
  tTrace("server conn %p start to be processed in BG Thread", req->data);
  return;
}

static void uvWorkAfterTask(uv_work_t* req, int status) {
  if (status != 0) {
    tTrace("server conn %p failed to processed ", req->data);
  }
  // Done time-consumeing task
  // add more func later
  // this func called in main loop
  tTrace("server conn %p already processed ", req->data);
  taosMemoryFree(req);
}

dengyihao's avatar
dengyihao 已提交
584 585 586 587 588 589
void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

wafwerar's avatar
wafwerar 已提交
590
  uv_tcp_t* cli = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
591 592 593
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
wafwerar's avatar
wafwerar 已提交
594
    if (pObj->numOfWorkerReady < pObj->numOfThreads) {
dengyihao's avatar
dengyihao 已提交
595 596
      tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads,
             pObj->numOfWorkerReady);
wafwerar's avatar
wafwerar 已提交
597 598 599
      uv_close((uv_handle_t*)cli, NULL);
      return;
    }
dengyihao's avatar
dengyihao 已提交
600

wafwerar's avatar
wafwerar 已提交
601
    uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
602
    wr->data = cli;
dengyihao's avatar
dengyihao 已提交
603 604 605
    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
606

dengyihao's avatar
dengyihao 已提交
607
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
608

dengyihao's avatar
dengyihao 已提交
609
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
610 611 612 613 614
  } else {
    uv_close((uv_handle_t*)cli, NULL);
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
615
  tTrace("server connection coming");
dengyihao's avatar
dengyihao 已提交
616 617 618 619 620
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
dengyihao's avatar
dengyihao 已提交
621 622 623 624
    tError("failed to create connect: %p", q);
    taosMemoryFree(buf->base);
    uv_close((uv_handle_t*)q, NULL);
    // taosMemoryFree(q);
dengyihao's avatar
dengyihao 已提交
625 626 627 628 629
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
wafwerar's avatar
wafwerar 已提交
630
  taosMemoryFree(buf->base);
dengyihao's avatar
dengyihao 已提交
631 632 633 634 635 636 637 638 639 640 641 642

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
fix bug  
dengyihao 已提交
643
  SSrvConn* pConn = createConn(pThrd);
644

dengyihao's avatar
dengyihao 已提交
645
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
646
  /* init conn timer*/
dengyihao's avatar
dengyihao 已提交
647 648
  // uv_timer_init(pThrd->loop, &pConn->pTimer);
  // pConn->pTimer.data = pConn;
dengyihao's avatar
dengyihao 已提交
649 650 651 652

  pConn->hostThrd = pThrd;

  // init client handle
wafwerar's avatar
wafwerar 已提交
653
  pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
654 655 656
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

dengyihao's avatar
dengyihao 已提交
657
  pConn->pWriter.data = pConn;
dengyihao's avatar
dengyihao 已提交
658

dengyihao's avatar
dengyihao 已提交
659 660
  transSetConnOption((uv_tcp_t*)pConn->pTcp);

dengyihao's avatar
dengyihao 已提交
661 662 663
  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
664
    tTrace("server conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
665

dengyihao's avatar
dengyihao 已提交
666 667
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
668
      tError("server conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
669
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
670
      return;
dengyihao's avatar
dengyihao 已提交
671
    }
dengyihao's avatar
dengyihao 已提交
672 673 674 675

    addrlen = sizeof(pConn->locaddr);
    if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
      tError("server conn %p failed to get local info", pConn);
dengyihao's avatar
dengyihao 已提交
676
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
677 678 679
      return;
    }

dengyihao's avatar
dengyihao 已提交
680
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
dengyihao's avatar
dengyihao 已提交
681

dengyihao's avatar
dengyihao 已提交
682
  } else {
dengyihao's avatar
dengyihao 已提交
683
    tDebug("failed to create new connection");
dengyihao's avatar
dengyihao 已提交
684
    transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
685 686 687
  }
}

688
void* transAcceptThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
689
  // opt
dengyihao's avatar
dengyihao 已提交
690
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
691 692
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
693 694

  return NULL;
dengyihao's avatar
dengyihao 已提交
695
}
dengyihao's avatar
dengyihao 已提交
696
void uvOnPipeConnectionCb(uv_connect_t* connect, int status) {
wafwerar's avatar
wafwerar 已提交
697 698 699 700 701 702
  if (status != 0) {
    return;
  }
  SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req);
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
}
dengyihao's avatar
dengyihao 已提交
703
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) {
wafwerar's avatar
wafwerar 已提交
704
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
705 706 707
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
708

dengyihao's avatar
dengyihao 已提交
709
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
wafwerar's avatar
wafwerar 已提交
710
  // int r = uv_pipe_open(pThrd->pipe, pThrd->fd);
dengyihao's avatar
dengyihao 已提交
711 712 713

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
714
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
715
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
716

dengyihao's avatar
fix bug  
dengyihao 已提交
717 718 719
  // conn set
  QUEUE_INIT(&pThrd->conn);

dengyihao's avatar
dengyihao 已提交
720
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
wafwerar's avatar
wafwerar 已提交
721 722
  uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
  // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
723 724 725 726 727 728 729 730 731 732 733 734 735
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
736
  // register an async here to quit server gracefully
wafwerar's avatar
wafwerar 已提交
737
  srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t));
dengyihao's avatar
dengyihao 已提交
738 739
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
740

dengyihao's avatar
dengyihao 已提交
741
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
742 743 744 745 746
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
747
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
748
    tError("failed to listen: %s", uv_err_name(err));
dengyihao's avatar
dengyihao 已提交
749
    terrno = TSDB_CODE_RPC_PORT_EADDRINUSE;
dengyihao's avatar
dengyihao 已提交
750 751 752
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
753
}
754
void* transWorkerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
755
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
756
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
757
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
758 759

  return NULL;
dengyihao's avatar
dengyihao 已提交
760 761
}

dengyihao's avatar
fix bug  
dengyihao 已提交
762 763 764
static SSrvConn* createConn(void* hThrd) {
  SWorkThrdObj* pThrd = hThrd;

wafwerar's avatar
wafwerar 已提交
765
  SSrvConn* pConn = (SSrvConn*)taosMemoryCalloc(1, sizeof(SSrvConn));
dengyihao's avatar
fix bug  
dengyihao 已提交
766 767 768
  QUEUE_INIT(&pConn->queue);

  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
dengyihao's avatar
dengyihao 已提交
769 770

  transQueueInit(&pConn->srvMsgs, NULL);
dengyihao's avatar
dengyihao 已提交
771

dengyihao's avatar
dengyihao 已提交
772
  memset(&pConn->regArg, 0, sizeof(pConn->regArg));
dengyihao's avatar
dengyihao 已提交
773
  pConn->broken = false;
dengyihao's avatar
dengyihao 已提交
774
  pConn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
775

dengyihao's avatar
dengyihao 已提交
776 777 778 779 780 781 782
  SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
  exh->handle = pConn;
  exh->pThrd = pThrd;
  exh->refId = uvAddExHandle(exh);
  uvAcquireExHandle(exh->refId);

  pConn->refId = exh->refId;
dengyihao's avatar
dengyihao 已提交
783
  transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
784
  tTrace("server handle %p, conn %p created, refId: %" PRId64 "", exh, pConn, pConn->refId);
dengyihao's avatar
dengyihao 已提交
785 786
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
787

dengyihao's avatar
dengyihao 已提交
788
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
789 790 791
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
792

dengyihao's avatar
dengyihao 已提交
793
  transDestroyBuffer(&conn->readBuf);
794
  if (clear) {
dengyihao's avatar
dengyihao 已提交
795
    tTrace("server conn %p to be destroyed", conn);
796 797 798 799
    // uv_shutdown_t* req = taosMemoryMalloc(sizeof(uv_shutdown_t));
    uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
    // uv_close(conn->pTcp)
    // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
800
  }
dengyihao's avatar
dengyihao 已提交
801
}
dengyihao's avatar
dengyihao 已提交
802 803 804 805 806 807 808 809 810 811 812 813 814
static int reallocConnRefHandle(SSrvConn* conn) {
  uvReleaseExHandle(conn->refId);
  uvRemoveExHandle(conn->refId);
  // avoid app continue to send msg on invalid handle
  SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
  exh->refId = uvAddExHandle(exh);
  uvAcquireExHandle(exh->refId);
  conn->refId = exh->refId;

  return 0;
}
dengyihao's avatar
dengyihao 已提交
815
static void uvDestroyConn(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
816 817 818 819
  SSrvConn* conn = handle->data;
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
820 821
  SWorkThrdObj* thrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
822 823 824
  uvReleaseExHandle(conn->refId);
  uvRemoveExHandle(conn->refId);

dengyihao's avatar
dengyihao 已提交
825
  tDebug("server conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
826
  // uv_timer_stop(&conn->pTimer);
dengyihao's avatar
dengyihao 已提交
827
  transQueueDestroy(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
828 829 830 831 832

  if (conn->regArg.init == 1) {
    transFreeMsg(conn->regArg.msg.pCont);
    conn->regArg.init = 0;
  }
dengyihao's avatar
dengyihao 已提交
833
  QUEUE_REMOVE(&conn->queue);
wafwerar's avatar
wafwerar 已提交
834
  taosMemoryFree(conn->pTcp);
D
dapan1121 已提交
835 836 837 838
  if (conn->regArg.init == 1) {
    transFreeMsg(conn->regArg.msg.pCont);
    conn->regArg.init = 0;
  }
dengyihao's avatar
dengyihao 已提交
839
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
840

dengyihao's avatar
dengyihao 已提交
841
  if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
842
    tTrace("work thread quit");
dengyihao's avatar
dengyihao 已提交
843
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
844
  }
dengyihao's avatar
dengyihao 已提交
845
}
wafwerar's avatar
wafwerar 已提交
846 847 848 849
static void uvPipeListenCb(uv_stream_t* handle, int status) {
  ASSERT(status == 0);

  SServerObj* srv = container_of(handle, SServerObj, pipeListen);
dengyihao's avatar
dengyihao 已提交
850
  uv_pipe_t*  pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
wafwerar's avatar
wafwerar 已提交
851 852 853 854 855 856 857 858 859 860 861 862 863 864
  ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1));
  ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe));

  ASSERT(1 == uv_is_readable((uv_stream_t*)pipe));
  ASSERT(1 == uv_is_writable((uv_stream_t*)pipe));
  ASSERT(0 == uv_is_closing((uv_handle_t*)pipe));

  srv->numOfWorkerReady++;

  // ASSERT(0 == uv_listen((uv_stream_t*)&ctx.send.tcp, 512, uvOnAcceptCb));

  // r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, read_cb);
  // ASSERT(r == 0);
}
dengyihao's avatar
dengyihao 已提交
865

U
ubuntu 已提交
866
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
867 868
  SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
869 870
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
wafwerar's avatar
wafwerar 已提交
871
  srv->numOfWorkerReady = 0;
wafwerar's avatar
wafwerar 已提交
872 873
  srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
dengyihao's avatar
dengyihao 已提交
874 875 876 877
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

dengyihao's avatar
dengyihao 已提交
878
  taosThreadOnce(&transModuleInit, uvInitEnv);
dengyihao's avatar
dengyihao 已提交
879
  transSrvInst++;
dengyihao's avatar
dengyihao 已提交
880

wafwerar's avatar
wafwerar 已提交
881 882
  assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0));
#ifdef WINDOWS
883 884
  char pipeName[64];
  snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId());
wafwerar's avatar
wafwerar 已提交
885
#else
886
  char pipeName[PATH_MAX] = {0};
dengyihao's avatar
dengyihao 已提交
887 888
  snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(),
           taosGetSelfPthreadId());
wafwerar's avatar
wafwerar 已提交
889 890 891 892
#endif
  assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName));
  assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));

dengyihao's avatar
dengyihao 已提交
893
  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
894
    SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
895
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
896
    thrd->quit = false;
dengyihao's avatar
dengyihao 已提交
897
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
898
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
899

wafwerar's avatar
wafwerar 已提交
900
    srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
dengyihao's avatar
dengyihao 已提交
901
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
902

dengyihao's avatar
dengyihao 已提交
903
    if (false == addHandleToWorkloop(thrd, pipeName)) {
dengyihao's avatar
dengyihao 已提交
904 905
      goto End;
    }
906
    int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
dengyihao's avatar
dengyihao 已提交
907 908 909 910 911 912
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
dengyihao's avatar
dengyihao 已提交
913
      goto End;
dengyihao's avatar
dengyihao 已提交
914 915
    }
  }
916
  if (false == taosValidIpAndPort(srv->ip, srv->port)) {
dengyihao's avatar
dengyihao 已提交
917 918
    terrno = TAOS_SYSTEM_ERROR(errno);
    tError("invalid ip/port, reason: %s", terrstr());
919 920
    goto End;
  }
dengyihao's avatar
dengyihao 已提交
921 922 923
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
924
  int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
dengyihao's avatar
dengyihao 已提交
925 926 927
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
dengyihao's avatar
dengyihao 已提交
928 929
    tError("failed  to create accept-thread");
    goto End;
dengyihao's avatar
dengyihao 已提交
930 931
    // clear all resource later
  }
dengyihao's avatar
dengyihao 已提交
932
  srv->inited = true;
dengyihao's avatar
dengyihao 已提交
933
  return srv;
dengyihao's avatar
dengyihao 已提交
934
End:
U
ubuntu 已提交
935
  transCloseServer(srv);
dengyihao's avatar
dengyihao 已提交
936 937
  return NULL;
}
dengyihao's avatar
dengyihao 已提交
938

dengyihao's avatar
dengyihao 已提交
939 940
void uvInitEnv() {
  uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
dengyihao's avatar
dengyihao 已提交
941 942
  uvOpenExHandleMgt(10000);
}
dengyihao's avatar
dengyihao 已提交
943 944 945 946
void uvOpenExHandleMgt(int size) {
  // added into once later
  exHandlesMgt = taosOpenRef(size, uvDestoryExHandle);
}
dengyihao's avatar
dengyihao 已提交
947 948 949 950
void uvCloseExHandleMgt() {
  // close ref
  taosCloseRef(exHandlesMgt);
}
dengyihao's avatar
dengyihao 已提交
951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975
int64_t uvAddExHandle(void* p) {
  // acquire extern handle
  return taosAddRef(exHandlesMgt, p);
}
int32_t uvRemoveExHandle(int64_t refId) {
  // acquire extern handle
  return taosRemoveRef(exHandlesMgt, refId);
}

SExHandle* uvAcquireExHandle(int64_t refId) {
  // acquire extern handle
  return (SExHandle*)taosAcquireRef(exHandlesMgt, refId);
}

int32_t uvReleaseExHandle(int64_t refId) {
  // release extern handle
  return taosReleaseRef(exHandlesMgt, refId);
}
void uvDestoryExHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  taosMemoryFree(handle);
}

dengyihao's avatar
dengyihao 已提交
976
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
977
  thrd->quit = true;
dengyihao's avatar
dengyihao 已提交
978
  if (QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
979
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
980 981 982
  } else {
    destroyAllConn(thrd);
  }
wafwerar's avatar
wafwerar 已提交
983
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
984 985 986 987
}
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
988
    reallocConnRefHandle(conn);
dengyihao's avatar
dengyihao 已提交
989
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
990
      return;
dengyihao's avatar
dengyihao 已提交
991 992 993
    }
    uvStartSendRespInternal(msg);
    return;
dengyihao's avatar
dengyihao 已提交
994 995
  } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
    tDebug("server conn %p already released, ignore release-msg", conn);
dengyihao's avatar
dengyihao 已提交
996
  }
dengyihao's avatar
dengyihao 已提交
997
  destroySmsg(msg);
dengyihao's avatar
dengyihao 已提交
998
}
dengyihao's avatar
dengyihao 已提交
999
void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
1000
  // send msg to client
dengyihao's avatar
dengyihao 已提交
1001
  tDebug("server conn %p start to send resp (2/2)", msg->pConn);
dengyihao's avatar
dengyihao 已提交
1002 1003
  uvStartSendResp(msg);
}
dengyihao's avatar
dengyihao 已提交
1004 1005
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
dengyihao's avatar
dengyihao 已提交
1006
  tDebug("server conn %p register brokenlink callback", conn);
dengyihao's avatar
dengyihao 已提交
1007
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
1008
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
1009 1010
      return;
    }
dengyihao's avatar
dengyihao 已提交
1011
    transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
1012 1013 1014
    conn->regArg.notifyCount = 0;
    conn->regArg.init = 1;
    conn->regArg.msg = msg->msg;
dengyihao's avatar
dengyihao 已提交
1015
    tDebug("server conn %p register brokenlink callback succ", conn);
dengyihao's avatar
dengyihao 已提交
1016 1017 1018 1019 1020 1021

    if (conn->broken) {
      STrans* pTransInst = conn->pTransInst;
      (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
      memset(&conn->regArg, 0, sizeof(conn->regArg));
    }
wafwerar's avatar
wafwerar 已提交
1022
    taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
1023 1024
  }
}
dengyihao's avatar
dengyihao 已提交
1025 1026 1027 1028
void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
1029
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
1030
  SRV_RELEASE_UV(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
1031
  transDestroyAsyncPool(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
1032
  taosMemoryFree(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
1033
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
1034
}
dengyihao's avatar
dengyihao 已提交
1035
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
wafwerar's avatar
wafwerar 已提交
1036
  SSrvMsg* msg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1037
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
1038
  tDebug("server send quit msg to work thread");
dengyihao's avatar
dengyihao 已提交
1039
  transSendAsync(pThrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
1040 1041
}

U
ubuntu 已提交
1042
void transCloseServer(void* arg) {
dengyihao's avatar
dengyihao 已提交
1043 1044
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
1045 1046

  tDebug("send quit msg to accept thread");
dengyihao's avatar
dengyihao 已提交
1047 1048 1049 1050
  if (srv->inited) {
    uv_async_send(srv->pAcceptAsync);
    taosThreadJoin(srv->thread, NULL);
  }
dengyihao's avatar
dengyihao 已提交
1051
  SRV_RELEASE_UV(srv->loop);
dengyihao's avatar
dengyihao 已提交
1052

dengyihao's avatar
dengyihao 已提交
1053 1054 1055 1056 1057
  for (int i = 0; i < srv->numOfThreads; i++) {
    sendQuitToWorkThrd(srv->pThreadObj[i]);
    destroyWorkThrd(srv->pThreadObj[i]);
  }

wafwerar's avatar
wafwerar 已提交
1058 1059 1060
  taosMemoryFree(srv->pThreadObj);
  taosMemoryFree(srv->pAcceptAsync);
  taosMemoryFree(srv->loop);
dengyihao's avatar
dengyihao 已提交
1061 1062

  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
1063
    taosMemoryFree(srv->pipe[i]);
dengyihao's avatar
dengyihao 已提交
1064
  }
wafwerar's avatar
wafwerar 已提交
1065
  taosMemoryFree(srv->pipe);
dengyihao's avatar
dengyihao 已提交
1066

wafwerar's avatar
wafwerar 已提交
1067
  taosMemoryFree(srv);
dengyihao's avatar
dengyihao 已提交
1068

dengyihao's avatar
dengyihao 已提交
1069 1070
  transSrvInst--;
  if (transSrvInst == 0) {
1071 1072
    TdThreadOnce tmpInit = PTHREAD_ONCE_INIT;
    memcpy(&transModuleInit, &tmpInit, sizeof(TdThreadOnce));
dengyihao's avatar
dengyihao 已提交
1073 1074
    uvCloseExHandleMgt();
  }
dengyihao's avatar
dengyihao 已提交
1075
}
dengyihao's avatar
dengyihao 已提交
1076

dengyihao's avatar
dengyihao 已提交
1077 1078 1079 1080 1081
void transRefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
1082
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
1083 1084 1085 1086 1087 1088 1089
}

void transUnrefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
1090
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
1091 1092 1093 1094
  if (ref == 0) {
    destroyConn((SSrvConn*)handle, true);
  }
}
dengyihao's avatar
dengyihao 已提交
1095 1096

void transReleaseSrvHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
1097
  SExHandle* exh = handle;
dengyihao's avatar
dengyihao 已提交
1098 1099
  int64_t    refId = exh->refId;

dengyihao's avatar
dengyihao 已提交
1100
  ASYNC_CHECK_HANDLE(exh, refId);
dengyihao's avatar
dengyihao 已提交
1101

dengyihao's avatar
dengyihao 已提交
1102 1103 1104
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);

dengyihao's avatar
dengyihao 已提交
1105
  STransMsg tmsg = {.code = 0, .handle = exh, .ahandle = NULL, .refId = refId};
dengyihao's avatar
dengyihao 已提交
1106

wafwerar's avatar
wafwerar 已提交
1107
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1108 1109 1110
  srvMsg->msg = tmsg;
  srvMsg->type = Release;

dengyihao's avatar
dengyihao 已提交
1111
  tTrace("server conn %p start to release", exh->handle);
dengyihao's avatar
dengyihao 已提交
1112
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
1113 1114
  uvReleaseExHandle(refId);
  return;
dengyihao's avatar
dengyihao 已提交
1115
_return1:
dengyihao's avatar
dengyihao 已提交
1116
  tTrace("server handle %p failed to send to release handle", exh);
dengyihao's avatar
dengyihao 已提交
1117
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
1118 1119
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1120
  tTrace("server handle %p failed to send to release handle", exh);
dengyihao's avatar
dengyihao 已提交
1121
  return;
dengyihao's avatar
dengyihao 已提交
1122
}
dengyihao's avatar
dengyihao 已提交
1123 1124 1125 1126
void transSendResponse(const STransMsg* msg) {
  SExHandle* exh = msg->handle;
  int64_t    refId = msg->refId;
  ASYNC_CHECK_HANDLE(exh, refId);
dengyihao's avatar
dengyihao 已提交
1127
  assert(refId != 0);
dengyihao's avatar
dengyihao 已提交
1128

dengyihao's avatar
dengyihao 已提交
1129 1130 1131
  STransMsg tmsg = *msg;
  tmsg.refId = refId;

dengyihao's avatar
dengyihao 已提交
1132 1133
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);
dengyihao's avatar
dengyihao 已提交
1134

wafwerar's avatar
wafwerar 已提交
1135
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1136
  srvMsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
1137
  srvMsg->type = Normal;
1138
  tDebug("server conn %p start to send resp (1/2)", exh->handle);
dengyihao's avatar
dengyihao 已提交
1139
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
1140 1141
  uvReleaseExHandle(refId);
  return;
dengyihao's avatar
dengyihao 已提交
1142
_return1:
dengyihao's avatar
dengyihao 已提交
1143
  tTrace("server handle %p failed to send resp", exh);
dengyihao's avatar
dengyihao 已提交
1144
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1145
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
1146 1147
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1148
  tTrace("server handle %p failed to send resp", exh);
dengyihao's avatar
dengyihao 已提交
1149 1150
  rpcFreeCont(msg->pCont);
  return;
dengyihao's avatar
dengyihao 已提交
1151
}
dengyihao's avatar
dengyihao 已提交
1152
void transRegisterMsg(const STransMsg* msg) {
dengyihao's avatar
dengyihao 已提交
1153
  SExHandle* exh = msg->handle;
dengyihao's avatar
dengyihao 已提交
1154 1155 1156
  int64_t    refId = msg->refId;
  ASYNC_CHECK_HANDLE(exh, refId);

dengyihao's avatar
dengyihao 已提交
1157 1158 1159
  STransMsg tmsg = *msg;
  tmsg.refId = refId;

dengyihao's avatar
dengyihao 已提交
1160 1161
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);
dengyihao's avatar
dengyihao 已提交
1162

wafwerar's avatar
wafwerar 已提交
1163
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1164
  srvMsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
1165
  srvMsg->type = Register;
dengyihao's avatar
dengyihao 已提交
1166
  tTrace("server conn %p start to register brokenlink callback", exh->handle);
dengyihao's avatar
dengyihao 已提交
1167
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
1168 1169
  uvReleaseExHandle(refId);
  return;
dengyihao's avatar
dengyihao 已提交
1170

dengyihao's avatar
dengyihao 已提交
1171
_return1:
dengyihao's avatar
dengyihao 已提交
1172
  tTrace("server handle %p failed to send to register brokenlink", exh);
dengyihao's avatar
dengyihao 已提交
1173
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1174
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
1175 1176
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1177
  tTrace("server handle %p failed to send to register brokenlink", exh);
dengyihao's avatar
dengyihao 已提交
1178
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1179
}
dengyihao's avatar
dengyihao 已提交
1180

dengyihao's avatar
formate  
dengyihao 已提交
1181
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
dengyihao's avatar
dengyihao 已提交
1182 1183 1184 1185 1186 1187
  if (thandle == NULL) {
    tTrace("invalid handle %p, failed to Get Conn info", thandle);
    return -1;
  }
  SExHandle* ex = thandle;
  SSrvConn*  pConn = ex->handle;
U
ubuntu 已提交
1188

dengyihao's avatar
dengyihao 已提交
1189
  struct sockaddr_in addr = pConn->addr;
dengyihao's avatar
dengyihao 已提交
1190 1191
  pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(addr.sin_port);
dengyihao's avatar
dengyihao 已提交
1192 1193 1194 1195
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
1196
#endif