transSrv.c 37.6 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20 21 22
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;

static char* notify = "a";
dengyihao's avatar
dengyihao 已提交
23
static int   transSrvInst = 0;
dengyihao's avatar
dengyihao 已提交
24

dengyihao's avatar
dengyihao 已提交
25 26 27 28 29 30
typedef struct {
  int       notifyCount;  //
  int       init;         // init or not
  STransMsg msg;
} SSrvRegArg;

dengyihao's avatar
dengyihao 已提交
31
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
32
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
33 34 35
  uv_tcp_t*  pTcp;
  uv_write_t pWriter;
  uv_timer_t pTimer;
dengyihao's avatar
dengyihao 已提交
36 37

  queue       queue;
dengyihao's avatar
dengyihao 已提交
38
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
39
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
40 41 42
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
43
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
44
  STransQueue srvMsgs;
dengyihao's avatar
dengyihao 已提交
45

dengyihao's avatar
dengyihao 已提交
46 47
  SSrvRegArg regArg;
  bool       broken;  // conn broken;
dengyihao's avatar
dengyihao 已提交
48

dengyihao's avatar
dengyihao 已提交
49
  ConnStatus         status;
dengyihao's avatar
dengyihao 已提交
50
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
51
  struct sockaddr_in locaddr;
U
ubuntu 已提交
52

dengyihao's avatar
dengyihao 已提交
53 54 55 56 57 58
  int64_t refId;
  int     spi;
  char    info[64];
  char    user[TSDB_UNI_LEN];  // user ID for the link
  char    secret[TSDB_PASSWORD_LEN];
  char    ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
59 60 61
} SSrvConn;

typedef struct SSrvMsg {
dengyihao's avatar
dengyihao 已提交
62 63 64 65
  SSrvConn*     pConn;
  STransMsg     msg;
  queue         q;
  STransMsgType type;
dengyihao's avatar
dengyihao 已提交
66
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
67 68

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
69
  TdThread      thread;
wafwerar's avatar
wafwerar 已提交
70
  uv_connect_t  connect_req;
dengyihao's avatar
dengyihao 已提交
71 72 73 74 75
  uv_pipe_t*    pipe;
  uv_os_fd_t    fd;
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  queue         msg;
wafwerar's avatar
wafwerar 已提交
76
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
77 78 79 80

  queue conn;
  void* pTransInst;
  bool  quit;
dengyihao's avatar
dengyihao 已提交
81 82 83
} SWorkThrdObj;

typedef struct SServerObj {
dengyihao's avatar
dengyihao 已提交
84
  TdThread   thread;
dengyihao's avatar
dengyihao 已提交
85 86 87 88
  uv_tcp_t   server;
  uv_loop_t* loop;

  // work thread info
dengyihao's avatar
dengyihao 已提交
89 90
  int            workerIdx;
  int            numOfThreads;
wafwerar's avatar
wafwerar 已提交
91
  int            numOfWorkerReady;
dengyihao's avatar
dengyihao 已提交
92
  SWorkThrdObj** pThreadObj;
dengyihao's avatar
dengyihao 已提交
93

wafwerar's avatar
wafwerar 已提交
94
  uv_pipe_t   pipeListen;
dengyihao's avatar
dengyihao 已提交
95 96 97 98
  uv_pipe_t** pipe;
  uint32_t    ip;
  uint32_t    port;
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
99 100

  bool inited;
dengyihao's avatar
dengyihao 已提交
101 102
} SServerObj;

dengyihao's avatar
dengyihao 已提交
103 104 105 106 107 108 109
// handle
typedef struct SExHandle {
  void*         handle;
  int64_t       refId;
  SWorkThrdObj* pThrd;
} SExHandle;

dengyihao's avatar
dengyihao 已提交
110
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
111
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
112
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
113
static void uvOnTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
114
static void uvOnSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
115
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
116 117 118
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
119
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
120
static void uvShutDownCb(uv_shutdown_t* req, int status);
dengyihao's avatar
dengyihao 已提交
121 122 123 124 125 126 127

/*
 * time-consuming task throwed into BG work thread
 */
static void uvWorkDoTask(uv_work_t* req);
static void uvWorkAfterTask(uv_work_t* req, int status);

dengyihao's avatar
dengyihao 已提交
128 129
static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
130

dengyihao's avatar
dengyihao 已提交
131
static void uvStartSendRespInternal(SSrvMsg* smsg);
dengyihao's avatar
dengyihao 已提交
132 133
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
134

dengyihao's avatar
dengyihao 已提交
135 136
static void uvNotifyLinkBrokenToApp(SSrvConn* conn);

dengyihao's avatar
dengyihao 已提交
137
static void destroySmsg(SSrvMsg* smsg);
138
// check whether already read complete packet
dengyihao's avatar
fix bug  
dengyihao 已提交
139
static SSrvConn* createConn(void* hThrd);
dengyihao's avatar
dengyihao 已提交
140
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
141

dengyihao's avatar
dengyihao 已提交
142 143
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
144
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
145
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
146 147
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
                                                                       uvHandleRegister};
dengyihao's avatar
dengyihao 已提交
148

dengyihao's avatar
dengyihao 已提交
149
static int32_t exHandlesMgt;
dengyihao's avatar
dengyihao 已提交
150

dengyihao's avatar
dengyihao 已提交
151
void       uvInitEnv();
dengyihao's avatar
dengyihao 已提交
152
void       uvOpenExHandleMgt(int size);
dengyihao's avatar
dengyihao 已提交
153
void       uvCloseExHandleMgt();
dengyihao's avatar
dengyihao 已提交
154 155 156 157 158 159
int64_t    uvAddExHandle(void* p);
int32_t    uvRemoveExHandle(int64_t refId);
int32_t    uvReleaseExHandle(int64_t refId);
void       uvDestoryExHandle(void* handle);
SExHandle* uvAcquireExHandle(int64_t refId);

160
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
161

dengyihao's avatar
dengyihao 已提交
162
// server and worker thread
163 164
static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
dengyihao's avatar
dengyihao 已提交
165

dengyihao's avatar
dengyihao 已提交
166
// add handle loop
wafwerar's avatar
wafwerar 已提交
167
static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName);
dengyihao's avatar
dengyihao 已提交
168 169
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
#define CONN_SHOULD_RELEASE(conn, head)                                     \
  do {                                                                      \
    if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) {          \
      conn->status = ConnRelease;                                           \
      transClearBuffer(&conn->readBuf);                                     \
      transFreeMsg(transContFromHead((char*)head));                         \
      tTrace("server conn %p received release request", conn);              \
                                                                            \
      STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
      SSrvMsg*  srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));              \
      srvMsg->msg = tmsg;                                                   \
      srvMsg->type = Release;                                               \
      srvMsg->pConn = conn;                                                 \
      if (!transQueuePush(&conn->srvMsgs, srvMsg)) {                        \
        return;                                                             \
      }                                                                     \
      uvStartSendRespInternal(srvMsg);                                      \
      return;                                                               \
    }                                                                       \
  } while (0)

#define SRV_RELEASE_UV(loop)       \
  do {                             \
    uv_walk(loop, uvWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);  \
    uv_loop_close(loop);           \
  } while (0);

dengyihao's avatar
dengyihao 已提交
198 199 200 201
#define ASYNC_ERR_JRET(thrd)                            \
  do {                                                  \
    if (thrd->quit) {                                   \
      tTrace("worker thread already quit, ignore msg"); \
dengyihao's avatar
dengyihao 已提交
202
      goto _return1;                                    \
dengyihao's avatar
dengyihao 已提交
203 204 205
    }                                                   \
  } while (0)

dengyihao's avatar
dengyihao 已提交
206 207 208 209 210
#define ASYNC_CHECK_HANDLE(exh1, refId)                                                                               \
  do {                                                                                                                \
    if (refId > 0) {                                                                                                  \
      tTrace("server handle step1");                                                                                  \
      SExHandle* exh2 = uvAcquireExHandle(refId);                                                                     \
dengyihao's avatar
dengyihao 已提交
211
      if (exh2 == NULL || refId != exh2->refId) {                                                                     \
dengyihao's avatar
dengyihao 已提交
212
        tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
dengyihao's avatar
dengyihao 已提交
213
               exh2 ? exh2->refId : 0, refId);                                                                        \
dengyihao's avatar
dengyihao 已提交
214 215 216
        goto _return1;                                                                                                \
      }                                                                                                               \
    } else if (refId == 0) {                                                                                          \
dengyihao's avatar
dengyihao 已提交
217
      tTrace("server handle step2");                                                                                  \
dengyihao's avatar
dengyihao 已提交
218
      SExHandle* exh2 = uvAcquireExHandle(refId);                                                                     \
dengyihao's avatar
dengyihao 已提交
219 220 221
      if (exh2 == NULL || refId != exh2->refId) {                                                                     \
        tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
               refId, exh2 ? exh2->refId : 0);                                                                        \
dengyihao's avatar
dengyihao 已提交
222 223 224 225 226
        goto _return1;                                                                                                \
      } else {                                                                                                        \
        refId = exh1->refId;                                                                                          \
      }                                                                                                               \
    } else if (refId == -1) {                                                                                         \
dengyihao's avatar
dengyihao 已提交
227
      tTrace("server handle step3");                                                                                  \
dengyihao's avatar
dengyihao 已提交
228 229
      goto _return2;                                                                                                  \
    }                                                                                                                 \
dengyihao's avatar
dengyihao 已提交
230 231
  } while (0)

dengyihao's avatar
dengyihao 已提交
232
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
233 234 235
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
236 237 238 239
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
240
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
241
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
242 243
}

dengyihao's avatar
dengyihao 已提交
244 245
static void uvHandleReq(SSrvConn* pConn) {
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
246 247 248 249
  char*        msg = pBuf->buf;
  uint32_t     msgLen = pBuf->len;

  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
250
  pHead->code = htonl(pHead->code);
dengyihao's avatar
dengyihao 已提交
251
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
252
  memcpy(pConn->user, pHead->user, strlen(pHead->user));
dengyihao's avatar
dengyihao 已提交
253

dengyihao's avatar
dengyihao 已提交
254 255 256 257 258 259 260
  // TODO(dengyihao): time-consuming task throwed into BG Thread
  //  uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
  //  wreq->data = pConn;
  //  uv_read_stop((uv_stream_t*)pConn->pTcp);
  //  transRefSrvHandle(pConn);
  //  uv_queue_work(((SWorkThrdObj*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);

dengyihao's avatar
dengyihao 已提交
261
  CONN_SHOULD_RELEASE(pConn, pHead);
dengyihao's avatar
dengyihao 已提交
262

dengyihao's avatar
dengyihao 已提交
263 264 265 266 267
  STransMsg transMsg;
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = pHead->content;
  transMsg.msgType = pHead->msgType;
  transMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
268
  transMsg.ahandle = (void*)pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
269
  transMsg.handle = NULL;
dengyihao's avatar
dengyihao 已提交
270

dengyihao's avatar
dengyihao 已提交
271
  // transDestroyBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
272
  transClearBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
273
  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
274 275 276 277
  if (pConn->status == ConnNormal) {
    if (pHead->persist == 1) {
      pConn->status = ConnAcquire;
      transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
278
      tDebug("server conn %p acquired by server app", pConn);
dengyihao's avatar
dengyihao 已提交
279 280 281
    }
  }
  if (pConn->status == ConnNormal && pHead->noResp == 0) {
dengyihao's avatar
dengyihao 已提交
282
    transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
283 284 285
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
           taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
           ntohs(pConn->locaddr.sin_port), transMsg.contLen);
dengyihao's avatar
dengyihao 已提交
286
  } else {
dengyihao's avatar
dengyihao 已提交
287 288 289
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
           TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
           taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
dengyihao's avatar
dengyihao 已提交
290 291 292
    // no ref here
  }

dengyihao's avatar
dengyihao 已提交
293 294 295 296
  // if pHead->noResp = 1,
  // 1. server application should not send resp on handle
  // 2. once send out data, cli conn released to conn pool immediately
  // 3. not mixed with persist
dengyihao's avatar
dengyihao 已提交
297

dengyihao's avatar
dengyihao 已提交
298
  transMsg.handle = (void*)uvAcquireExHandle(pConn->refId);
dengyihao's avatar
dengyihao 已提交
299 300 301
  tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.handle, pConn, pConn->refId);
  transMsg.refId = pConn->refId;
  assert(transMsg.handle != NULL);
dengyihao's avatar
dengyihao 已提交
302
  if (pHead->noResp == 1) {
dengyihao's avatar
dengyihao 已提交
303
    transMsg.refId = -1;
dengyihao's avatar
dengyihao 已提交
304 305
  }
  uvReleaseExHandle(pConn->refId);
dengyihao's avatar
dengyihao 已提交
306

dengyihao's avatar
dengyihao 已提交
307
  STrans* pTransInst = pConn->pTransInst;
dengyihao's avatar
dengyihao 已提交
308
  (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
dengyihao's avatar
dengyihao 已提交
309
  // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
310 311
}

dengyihao's avatar
dengyihao 已提交
312
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
313
  // opt
dengyihao's avatar
dengyihao 已提交
314 315
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
316 317
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
318
    tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
319
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
320
      tTrace("server conn %p alread read complete packet", conn);
dengyihao's avatar
dengyihao 已提交
321
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
322
    } else {
dengyihao's avatar
dengyihao 已提交
323
      tTrace("server %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
324 325 326
    }
    return;
  }
327 328 329
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
330 331

  tError("server conn %p read error: %s", conn, uv_err_name(nread));
U
ubuntu 已提交
332
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
333
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
334 335
    if (conn->status == ConnAcquire) {
      if (conn->regArg.init) {
dengyihao's avatar
dengyihao 已提交
336
        tTrace("server conn %p broken, notify server app", conn);
dengyihao's avatar
dengyihao 已提交
337 338 339 340 341
        STrans* pTransInst = conn->pTransInst;
        (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
        memset(&conn->regArg, 0, sizeof(conn->regArg));
      }
    }
dengyihao's avatar
dengyihao 已提交
342
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
343 344 345 346
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->len = 2;
wafwerar's avatar
wafwerar 已提交
347
  buf->base = taosMemoryCalloc(1, sizeof(char) * buf->len);
dengyihao's avatar
dengyihao 已提交
348 349 350 351
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
352
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
353
  tError("server conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
354 355
}

dengyihao's avatar
dengyihao 已提交
356
void uvOnSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
357
  SSrvConn* conn = req->data;
dengyihao's avatar
dengyihao 已提交
358
  // transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
359
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
360
    tTrace("server conn %p data already was written on stream", conn);
dengyihao's avatar
dengyihao 已提交
361 362
    if (!transQueueEmpty(&conn->srvMsgs)) {
      SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
363 364 365 366
      if (msg->type == Release && conn->status != ConnNormal) {
        conn->status = ConnNormal;
        transUnrefSrvHandle(conn);
      }
dengyihao's avatar
add UT  
dengyihao 已提交
367 368
      destroySmsg(msg);
      // send second data, just use for push
dengyihao's avatar
dengyihao 已提交
369
      if (!transQueueEmpty(&conn->srvMsgs)) {
dengyihao's avatar
dengyihao 已提交
370
        msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
371 372 373 374 375 376 377 378 379
        if (msg->type == Register && conn->status == ConnAcquire) {
          conn->regArg.notifyCount = 0;
          conn->regArg.init = 1;
          conn->regArg.msg = msg->msg;
          if (conn->broken) {
            STrans* pTransInst = conn->pTransInst;
            (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
            memset(&conn->regArg, 0, sizeof(conn->regArg));
          }
dengyihao's avatar
dengyihao 已提交
380
          transQueuePop(&conn->srvMsgs);
wafwerar's avatar
wafwerar 已提交
381
          taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
382 383

          msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
384 385 386
          if (msg != NULL) {
            uvStartSendRespInternal(msg);
          }
dengyihao's avatar
dengyihao 已提交
387 388 389
        } else {
          uvStartSendRespInternal(msg);
        }
dengyihao's avatar
add UT  
dengyihao 已提交
390
      }
dengyihao's avatar
dengyihao 已提交
391
    }
dengyihao's avatar
dengyihao 已提交
392
  } else {
dengyihao's avatar
dengyihao 已提交
393
    tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
394
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
395
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
396 397
  }
}
dengyihao's avatar
dengyihao 已提交
398 399
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
400
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
401 402 403
  } else {
    tError("fail to dispatch conn to work thread");
  }
dengyihao's avatar
dengyihao 已提交
404 405
  uv_close((uv_handle_t*)req->data, uvFreeCb);
  // taosMemoryFree(req->data);
wafwerar's avatar
wafwerar 已提交
406
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
407
}
dengyihao's avatar
dengyihao 已提交
408

dengyihao's avatar
dengyihao 已提交
409
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
dengyihao's avatar
dengyihao 已提交
410
  tTrace("server conn %p prepare to send resp", smsg->pConn);
dengyihao's avatar
dengyihao 已提交
411

dengyihao's avatar
formate  
dengyihao 已提交
412 413
  SSrvConn*  pConn = smsg->pConn;
  STransMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
414 415 416 417 418
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
419
  pHead->ahandle = (uint64_t)pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
420

dengyihao's avatar
dengyihao 已提交
421 422 423 424 425
  if (pConn->status == ConnNormal) {
    pHead->msgType = pConn->inType + 1;
  } else {
    pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType;
  }
dengyihao's avatar
dengyihao 已提交
426
  pHead->release = smsg->type == Release ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
427
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
428

dengyihao's avatar
dengyihao 已提交
429 430
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
431 432 433
  tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
         taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
         ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
434
  pHead->msgLen = htonl(len);
dengyihao's avatar
dengyihao 已提交
435

dengyihao's avatar
dengyihao 已提交
436 437 438
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
439 440

static void uvStartSendRespInternal(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
441 442 443 444
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
445
  // uv_timer_stop(&pConn->pTimer);
dengyihao's avatar
dengyihao 已提交
446
  uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
dengyihao's avatar
dengyihao 已提交
447 448 449 450
}
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
451 452

  if (pConn->broken == true) {
dengyihao's avatar
dengyihao 已提交
453
    // persist by
dengyihao's avatar
dengyihao 已提交
454 455 456
    transUnrefSrvHandle(pConn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
457 458 459
  if (pConn->status == ConnNormal) {
    transUnrefSrvHandle(pConn);
  }
dengyihao's avatar
dengyihao 已提交
460

dengyihao's avatar
dengyihao 已提交
461
  if (!transQueuePush(&pConn->srvMsgs, smsg)) {
dengyihao's avatar
dengyihao 已提交
462 463 464
    return;
  }
  uvStartSendRespInternal(smsg);
dengyihao's avatar
dengyihao 已提交
465 466
  return;
}
dengyihao's avatar
dengyihao 已提交
467

dengyihao's avatar
dengyihao 已提交
468
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
469 470 471 472
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
wafwerar's avatar
wafwerar 已提交
473
  taosMemoryFree(smsg);
dengyihao's avatar
dengyihao 已提交
474
}
dengyihao's avatar
fix bug  
dengyihao 已提交
475
static void destroyAllConn(SWorkThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
476
  tTrace("thread %p destroy all conn ", pThrd);
dengyihao's avatar
fix bug  
dengyihao 已提交
477 478 479 480 481 482
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
    queue* h = QUEUE_HEAD(&pThrd->conn);
    QUEUE_REMOVE(h);
    QUEUE_INIT(h);

    SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
dengyihao's avatar
dengyihao 已提交
483 484 485
    while (T_REF_VAL_GET(c) >= 2) {
      transUnrefSrvHandle(c);
    }
dengyihao's avatar
dengyihao 已提交
486
    transUnrefSrvHandle(c);
dengyihao's avatar
fix bug  
dengyihao 已提交
487 488
  }
}
dengyihao's avatar
dengyihao 已提交
489
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
490 491
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
492
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
493
  queue         wq;
dengyihao's avatar
dengyihao 已提交
494

dengyihao's avatar
dengyihao 已提交
495
  // batch process to avoid to lock/unlock frequently
wafwerar's avatar
wafwerar 已提交
496
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
497
  QUEUE_MOVE(&item->qmsg, &wq);
wafwerar's avatar
wafwerar 已提交
498
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
499 500 501 502

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
503 504 505

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
506
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
507
      continue;
dengyihao's avatar
dengyihao 已提交
508
    }
dengyihao's avatar
dengyihao 已提交
509
    // release handle to rpc init
dengyihao's avatar
dengyihao 已提交
510 511
    if (msg->type == Quit) {
      (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
512
      continue;
dengyihao's avatar
dengyihao 已提交
513 514 515 516 517 518 519 520 521 522 523 524 525
    } else {
      STransMsg transMsg = msg->msg;

      SExHandle* exh1 = transMsg.handle;
      int64_t    refId = transMsg.refId;
      SExHandle* exh2 = uvAcquireExHandle(refId);
      if (exh2 == NULL || exh1 != exh2) {
        tTrace("server handle %p except msg, ignore it", exh1);
        uvReleaseExHandle(refId);
        destroySmsg(msg);
        continue;
      }
      msg->pConn = exh1->handle;
dengyihao's avatar
dengyihao 已提交
526
      uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
527
      (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
528
    }
dengyihao's avatar
dengyihao 已提交
529 530
  }
}
dengyihao's avatar
dengyihao 已提交
531 532 533 534 535
static void uvWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}
dengyihao's avatar
dengyihao 已提交
536 537 538 539
static void uvFreeCb(uv_handle_t* handle) {
  //
  taosMemoryFree(handle);
}
dengyihao's avatar
dengyihao 已提交
540

dengyihao's avatar
dengyihao 已提交
541 542
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
dengyihao's avatar
dengyihao 已提交
543
  tDebug("close server port %d", srv->port);
dengyihao's avatar
dengyihao 已提交
544
  uv_walk(srv->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
545
}
dengyihao's avatar
dengyihao 已提交
546

dengyihao's avatar
dengyihao 已提交
547
static void uvShutDownCb(uv_shutdown_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
548 549 550
  if (status != 0) {
    tDebug("conn failed to shut down: %s", uv_err_name(status));
  }
dengyihao's avatar
dengyihao 已提交
551
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
wafwerar's avatar
wafwerar 已提交
552
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
553 554
}

dengyihao's avatar
dengyihao 已提交
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
static void uvWorkDoTask(uv_work_t* req) {
  // doing time-consumeing task
  // only auth conn currently, add more func later
  tTrace("server conn %p start to be processed in BG Thread", req->data);
  return;
}

static void uvWorkAfterTask(uv_work_t* req, int status) {
  if (status != 0) {
    tTrace("server conn %p failed to processed ", req->data);
  }
  // Done time-consumeing task
  // add more func later
  // this func called in main loop
  tTrace("server conn %p already processed ", req->data);
  taosMemoryFree(req);
}

dengyihao's avatar
dengyihao 已提交
573 574 575 576 577 578
void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

wafwerar's avatar
wafwerar 已提交
579
  uv_tcp_t* cli = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
580 581 582
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
wafwerar's avatar
wafwerar 已提交
583 584 585 586 587 588
    if (pObj->numOfWorkerReady < pObj->numOfThreads) {
      tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady);
      uv_close((uv_handle_t*)cli, NULL);
      return;
    }
    
wafwerar's avatar
wafwerar 已提交
589
    uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
dengyihao's avatar
dengyihao 已提交
590
    wr->data = cli;
dengyihao's avatar
dengyihao 已提交
591 592 593
    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
594

dengyihao's avatar
dengyihao 已提交
595
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
596

dengyihao's avatar
dengyihao 已提交
597
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
598 599 600 601 602
  } else {
    uv_close((uv_handle_t*)cli, NULL);
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
603
  tTrace("server connection coming");
dengyihao's avatar
dengyihao 已提交
604 605 606 607 608
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
dengyihao's avatar
dengyihao 已提交
609 610 611 612
    tError("failed to create connect: %p", q);
    taosMemoryFree(buf->base);
    uv_close((uv_handle_t*)q, NULL);
    // taosMemoryFree(q);
dengyihao's avatar
dengyihao 已提交
613 614 615 616 617
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
wafwerar's avatar
wafwerar 已提交
618
  taosMemoryFree(buf->base);
dengyihao's avatar
dengyihao 已提交
619 620 621 622 623 624 625 626 627 628 629 630

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
fix bug  
dengyihao 已提交
631
  SSrvConn* pConn = createConn(pThrd);
632

dengyihao's avatar
dengyihao 已提交
633
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
634
  /* init conn timer*/
dengyihao's avatar
dengyihao 已提交
635 636
  // uv_timer_init(pThrd->loop, &pConn->pTimer);
  // pConn->pTimer.data = pConn;
dengyihao's avatar
dengyihao 已提交
637 638 639 640

  pConn->hostThrd = pThrd;

  // init client handle
wafwerar's avatar
wafwerar 已提交
641
  pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
642 643 644
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

dengyihao's avatar
dengyihao 已提交
645
  pConn->pWriter.data = pConn;
dengyihao's avatar
dengyihao 已提交
646

dengyihao's avatar
dengyihao 已提交
647 648
  transSetConnOption((uv_tcp_t*)pConn->pTcp);

dengyihao's avatar
dengyihao 已提交
649 650 651
  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
652
    tTrace("server conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
653

dengyihao's avatar
dengyihao 已提交
654 655
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
656
      tError("server conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
657
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
658
      return;
dengyihao's avatar
dengyihao 已提交
659
    }
dengyihao's avatar
dengyihao 已提交
660 661 662 663

    addrlen = sizeof(pConn->locaddr);
    if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
      tError("server conn %p failed to get local info", pConn);
dengyihao's avatar
dengyihao 已提交
664
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
665 666 667
      return;
    }

dengyihao's avatar
dengyihao 已提交
668
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
dengyihao's avatar
dengyihao 已提交
669

dengyihao's avatar
dengyihao 已提交
670
  } else {
dengyihao's avatar
dengyihao 已提交
671
    tDebug("failed to create new connection");
dengyihao's avatar
dengyihao 已提交
672
    transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
673 674 675
  }
}

676
void* transAcceptThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
677
  // opt
dengyihao's avatar
dengyihao 已提交
678
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
679 680
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
681 682

  return NULL;
dengyihao's avatar
dengyihao 已提交
683
}
wafwerar's avatar
wafwerar 已提交
684 685 686 687 688 689 690 691
void uvOnPipeConnectionCb(uv_connect_t *connect, int status) {
  if (status != 0) {
    return;
  }
  SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req);
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
}
static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName) {
wafwerar's avatar
wafwerar 已提交
692
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
693 694 695
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
696

dengyihao's avatar
dengyihao 已提交
697
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
wafwerar's avatar
wafwerar 已提交
698
  // int r = uv_pipe_open(pThrd->pipe, pThrd->fd);
dengyihao's avatar
dengyihao 已提交
699 700 701

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
702
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
703
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
704

dengyihao's avatar
fix bug  
dengyihao 已提交
705 706 707
  // conn set
  QUEUE_INIT(&pThrd->conn);

dengyihao's avatar
dengyihao 已提交
708
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
wafwerar's avatar
wafwerar 已提交
709 710
  uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
  // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
711 712 713 714 715 716 717 718 719 720 721 722 723
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
724
  // register an async here to quit server gracefully
wafwerar's avatar
wafwerar 已提交
725
  srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t));
dengyihao's avatar
dengyihao 已提交
726 727
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
728

dengyihao's avatar
dengyihao 已提交
729
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
730 731 732 733 734
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
735
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
736
    tError("failed to listen: %s", uv_err_name(err));
dengyihao's avatar
dengyihao 已提交
737
    terrno = TSDB_CODE_RPC_PORT_EADDRINUSE;
dengyihao's avatar
dengyihao 已提交
738 739 740
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
741
}
742
void* transWorkerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
743
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
744
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
745
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
746 747

  return NULL;
dengyihao's avatar
dengyihao 已提交
748 749
}

dengyihao's avatar
fix bug  
dengyihao 已提交
750 751 752
static SSrvConn* createConn(void* hThrd) {
  SWorkThrdObj* pThrd = hThrd;

wafwerar's avatar
wafwerar 已提交
753
  SSrvConn* pConn = (SSrvConn*)taosMemoryCalloc(1, sizeof(SSrvConn));
dengyihao's avatar
fix bug  
dengyihao 已提交
754 755 756
  QUEUE_INIT(&pConn->queue);

  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
dengyihao's avatar
dengyihao 已提交
757 758

  transQueueInit(&pConn->srvMsgs, NULL);
dengyihao's avatar
dengyihao 已提交
759

dengyihao's avatar
dengyihao 已提交
760
  memset(&pConn->regArg, 0, sizeof(pConn->regArg));
dengyihao's avatar
dengyihao 已提交
761
  pConn->broken = false;
dengyihao's avatar
dengyihao 已提交
762
  pConn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
763

dengyihao's avatar
dengyihao 已提交
764 765 766 767 768 769 770
  SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
  exh->handle = pConn;
  exh->pThrd = pThrd;
  exh->refId = uvAddExHandle(exh);
  uvAcquireExHandle(exh->refId);

  pConn->refId = exh->refId;
dengyihao's avatar
dengyihao 已提交
771
  transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
772
  tTrace("server handle %p, conn %p created, refId: %" PRId64 "", exh, pConn, pConn->refId);
dengyihao's avatar
dengyihao 已提交
773 774
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
775

dengyihao's avatar
dengyihao 已提交
776
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
777 778 779
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
780

dengyihao's avatar
dengyihao 已提交
781
  transDestroyBuffer(&conn->readBuf);
782
  if (clear) {
dengyihao's avatar
dengyihao 已提交
783
    tTrace("server conn %p to be destroyed", conn);
784 785 786 787
    // uv_shutdown_t* req = taosMemoryMalloc(sizeof(uv_shutdown_t));
    uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
    // uv_close(conn->pTcp)
    // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
788
  }
dengyihao's avatar
dengyihao 已提交
789 790
}
static void uvDestroyConn(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
791 792 793 794
  SSrvConn* conn = handle->data;
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
795 796
  SWorkThrdObj* thrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
797 798 799
  uvReleaseExHandle(conn->refId);
  uvRemoveExHandle(conn->refId);

dengyihao's avatar
dengyihao 已提交
800
  tDebug("server conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
801
  // uv_timer_stop(&conn->pTimer);
dengyihao's avatar
dengyihao 已提交
802
  transQueueDestroy(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
803 804 805 806 807

  if (conn->regArg.init == 1) {
    transFreeMsg(conn->regArg.msg.pCont);
    conn->regArg.init = 0;
  }
dengyihao's avatar
dengyihao 已提交
808
  QUEUE_REMOVE(&conn->queue);
wafwerar's avatar
wafwerar 已提交
809
  taosMemoryFree(conn->pTcp);
D
dapan1121 已提交
810 811 812 813
  if (conn->regArg.init == 1) {
    transFreeMsg(conn->regArg.msg.pCont);
    conn->regArg.init = 0;
  }
dengyihao's avatar
dengyihao 已提交
814
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
815

dengyihao's avatar
dengyihao 已提交
816
  if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
817
    tTrace("work thread quit");
dengyihao's avatar
dengyihao 已提交
818
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
819
  }
dengyihao's avatar
dengyihao 已提交
820
}
wafwerar's avatar
wafwerar 已提交
821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839
static void uvPipeListenCb(uv_stream_t* handle, int status) {
  ASSERT(status == 0);

  SServerObj* srv = container_of(handle, SServerObj, pipeListen);
  uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
  ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1));
  ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe));

  ASSERT(1 == uv_is_readable((uv_stream_t*)pipe));
  ASSERT(1 == uv_is_writable((uv_stream_t*)pipe));
  ASSERT(0 == uv_is_closing((uv_handle_t*)pipe));

  srv->numOfWorkerReady++;

  // ASSERT(0 == uv_listen((uv_stream_t*)&ctx.send.tcp, 512, uvOnAcceptCb));

  // r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, read_cb);
  // ASSERT(r == 0);
}
dengyihao's avatar
dengyihao 已提交
840

U
ubuntu 已提交
841
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
842 843
  SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
844 845
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
wafwerar's avatar
wafwerar 已提交
846
  srv->numOfWorkerReady = 0;
wafwerar's avatar
wafwerar 已提交
847 848
  srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
dengyihao's avatar
dengyihao 已提交
849 850 851 852
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

dengyihao's avatar
dengyihao 已提交
853
  taosThreadOnce(&transModuleInit, uvInitEnv);
dengyihao's avatar
dengyihao 已提交
854
  transSrvInst++;
dengyihao's avatar
dengyihao 已提交
855

wafwerar's avatar
wafwerar 已提交
856 857 858 859 860
  char pipeName[64];
  assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0));
#ifdef WINDOWS
  snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc\\%p-%lu", taosSafeRand(), GetCurrentProcessId());
#else
wafwerar's avatar
wafwerar 已提交
861
  snprintf(pipeName, sizeof(pipeName), ".trans.rpc\\%08X-%lu", taosSafeRand(), taosGetSelfPthreadId());
wafwerar's avatar
wafwerar 已提交
862 863 864 865
#endif
  assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName));
  assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));

dengyihao's avatar
dengyihao 已提交
866
  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
867
    SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
868
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
869
    thrd->quit = false;
dengyihao's avatar
dengyihao 已提交
870
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
871
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
872

wafwerar's avatar
wafwerar 已提交
873
    srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
dengyihao's avatar
dengyihao 已提交
874

wafwerar's avatar
wafwerar 已提交
875 876 877 878 879 880 881 882 883 884 885 886 887
  // #ifdef WINDOWS
  //   uv_file fds[2];
  //   if (uv_pipe(fds, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE) != 0) {
  // #else
  //   uv_os_sock_t fds[2];
  //   if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
  // #endif
  //     goto End;
  //   }
    // uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    // uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

    // thrd->fd = fds[0];
dengyihao's avatar
dengyihao 已提交
888
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
889

wafwerar's avatar
wafwerar 已提交
890
    if (false == addHandleToWorkloop(thrd,pipeName)) {
dengyihao's avatar
dengyihao 已提交
891 892
      goto End;
    }
893
    int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
dengyihao's avatar
dengyihao 已提交
894 895 896 897 898 899
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
dengyihao's avatar
dengyihao 已提交
900
      goto End;
dengyihao's avatar
dengyihao 已提交
901 902
    }
  }
903
  if (false == taosValidIpAndPort(srv->ip, srv->port)) {
dengyihao's avatar
dengyihao 已提交
904 905
    terrno = TAOS_SYSTEM_ERROR(errno);
    tError("invalid ip/port, reason: %s", terrstr());
906 907
    goto End;
  }
dengyihao's avatar
dengyihao 已提交
908 909 910
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
911
  int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
dengyihao's avatar
dengyihao 已提交
912 913 914
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
dengyihao's avatar
dengyihao 已提交
915 916
    tError("failed  to create accept-thread");
    goto End;
dengyihao's avatar
dengyihao 已提交
917 918
    // clear all resource later
  }
dengyihao's avatar
dengyihao 已提交
919
  srv->inited = true;
dengyihao's avatar
dengyihao 已提交
920
  return srv;
dengyihao's avatar
dengyihao 已提交
921
End:
U
ubuntu 已提交
922
  transCloseServer(srv);
dengyihao's avatar
dengyihao 已提交
923 924
  return NULL;
}
dengyihao's avatar
dengyihao 已提交
925

dengyihao's avatar
dengyihao 已提交
926 927
void uvInitEnv() {
  uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
dengyihao's avatar
dengyihao 已提交
928 929
  uvOpenExHandleMgt(10000);
}
dengyihao's avatar
dengyihao 已提交
930 931 932 933
void uvOpenExHandleMgt(int size) {
  // added into once later
  exHandlesMgt = taosOpenRef(size, uvDestoryExHandle);
}
dengyihao's avatar
dengyihao 已提交
934 935 936 937
void uvCloseExHandleMgt() {
  // close ref
  taosCloseRef(exHandlesMgt);
}
dengyihao's avatar
dengyihao 已提交
938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962
int64_t uvAddExHandle(void* p) {
  // acquire extern handle
  return taosAddRef(exHandlesMgt, p);
}
int32_t uvRemoveExHandle(int64_t refId) {
  // acquire extern handle
  return taosRemoveRef(exHandlesMgt, refId);
}

SExHandle* uvAcquireExHandle(int64_t refId) {
  // acquire extern handle
  return (SExHandle*)taosAcquireRef(exHandlesMgt, refId);
}

int32_t uvReleaseExHandle(int64_t refId) {
  // release extern handle
  return taosReleaseRef(exHandlesMgt, refId);
}
void uvDestoryExHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  taosMemoryFree(handle);
}

dengyihao's avatar
dengyihao 已提交
963
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
964
  thrd->quit = true;
dengyihao's avatar
dengyihao 已提交
965
  if (QUEUE_IS_EMPTY(&thrd->conn)) {
dengyihao's avatar
dengyihao 已提交
966
    uv_walk(thrd->loop, uvWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
967 968 969
  } else {
    destroyAllConn(thrd);
  }
wafwerar's avatar
wafwerar 已提交
970
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
971 972 973 974
}
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
975
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
976
      return;
dengyihao's avatar
dengyihao 已提交
977 978 979
    }
    uvStartSendRespInternal(msg);
    return;
dengyihao's avatar
dengyihao 已提交
980 981
  } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
    tDebug("server conn %p already released, ignore release-msg", conn);
dengyihao's avatar
dengyihao 已提交
982
  }
dengyihao's avatar
dengyihao 已提交
983
  destroySmsg(msg);
dengyihao's avatar
dengyihao 已提交
984
}
dengyihao's avatar
dengyihao 已提交
985
void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
986
  // send msg to client
dengyihao's avatar
dengyihao 已提交
987
  tDebug("server conn %p start to send resp (2/2)", msg->pConn);
dengyihao's avatar
dengyihao 已提交
988 989
  uvStartSendResp(msg);
}
dengyihao's avatar
dengyihao 已提交
990 991
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
dengyihao's avatar
dengyihao 已提交
992
  tDebug("server conn %p register brokenlink callback", conn);
dengyihao's avatar
dengyihao 已提交
993
  if (conn->status == ConnAcquire) {
dengyihao's avatar
dengyihao 已提交
994
    if (!transQueuePush(&conn->srvMsgs, msg)) {
dengyihao's avatar
dengyihao 已提交
995 996
      return;
    }
dengyihao's avatar
dengyihao 已提交
997
    transQueuePop(&conn->srvMsgs);
dengyihao's avatar
dengyihao 已提交
998 999 1000
    conn->regArg.notifyCount = 0;
    conn->regArg.init = 1;
    conn->regArg.msg = msg->msg;
dengyihao's avatar
dengyihao 已提交
1001
    tDebug("server conn %p register brokenlink callback succ", conn);
dengyihao's avatar
dengyihao 已提交
1002 1003 1004 1005 1006 1007

    if (conn->broken) {
      STrans* pTransInst = conn->pTransInst;
      (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
      memset(&conn->regArg, 0, sizeof(conn->regArg));
    }
wafwerar's avatar
wafwerar 已提交
1008
    taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
1009 1010
  }
}
dengyihao's avatar
dengyihao 已提交
1011 1012 1013 1014
void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
1015
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
1016
  SRV_RELEASE_UV(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
1017
  transDestroyAsyncPool(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
1018
  taosMemoryFree(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
1019
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
1020
}
dengyihao's avatar
dengyihao 已提交
1021
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
wafwerar's avatar
wafwerar 已提交
1022
  SSrvMsg* msg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1023
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
1024
  tDebug("server send quit msg to work thread");
dengyihao's avatar
dengyihao 已提交
1025
  transSendAsync(pThrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
1026 1027
}

U
ubuntu 已提交
1028
void transCloseServer(void* arg) {
dengyihao's avatar
dengyihao 已提交
1029 1030
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
1031 1032

  tDebug("send quit msg to accept thread");
dengyihao's avatar
dengyihao 已提交
1033 1034 1035 1036
  if (srv->inited) {
    uv_async_send(srv->pAcceptAsync);
    taosThreadJoin(srv->thread, NULL);
  }
dengyihao's avatar
dengyihao 已提交
1037
  SRV_RELEASE_UV(srv->loop);
dengyihao's avatar
dengyihao 已提交
1038

dengyihao's avatar
dengyihao 已提交
1039 1040 1041 1042 1043
  for (int i = 0; i < srv->numOfThreads; i++) {
    sendQuitToWorkThrd(srv->pThreadObj[i]);
    destroyWorkThrd(srv->pThreadObj[i]);
  }

wafwerar's avatar
wafwerar 已提交
1044 1045 1046
  taosMemoryFree(srv->pThreadObj);
  taosMemoryFree(srv->pAcceptAsync);
  taosMemoryFree(srv->loop);
dengyihao's avatar
dengyihao 已提交
1047 1048

  for (int i = 0; i < srv->numOfThreads; i++) {
wafwerar's avatar
wafwerar 已提交
1049
    taosMemoryFree(srv->pipe[i]);
dengyihao's avatar
dengyihao 已提交
1050
  }
wafwerar's avatar
wafwerar 已提交
1051
  taosMemoryFree(srv->pipe);
dengyihao's avatar
dengyihao 已提交
1052

wafwerar's avatar
wafwerar 已提交
1053
  taosMemoryFree(srv);
dengyihao's avatar
dengyihao 已提交
1054

dengyihao's avatar
dengyihao 已提交
1055 1056
  transSrvInst--;
  if (transSrvInst == 0) {
1057 1058
    TdThreadOnce tmpInit = PTHREAD_ONCE_INIT;
    memcpy(&transModuleInit, &tmpInit, sizeof(TdThreadOnce));
dengyihao's avatar
dengyihao 已提交
1059 1060
    uvCloseExHandleMgt();
  }
dengyihao's avatar
dengyihao 已提交
1061
}
dengyihao's avatar
dengyihao 已提交
1062

dengyihao's avatar
dengyihao 已提交
1063 1064 1065 1066 1067
void transRefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
1068
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
1069 1070 1071 1072 1073 1074 1075
}

void transUnrefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
1076
  tDebug("server conn %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
1077 1078 1079 1080
  if (ref == 0) {
    destroyConn((SSrvConn*)handle, true);
  }
}
dengyihao's avatar
dengyihao 已提交
1081 1082

void transReleaseSrvHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
1083
  SExHandle* exh = handle;
dengyihao's avatar
dengyihao 已提交
1084 1085
  int64_t    refId = exh->refId;

dengyihao's avatar
dengyihao 已提交
1086
  ASYNC_CHECK_HANDLE(exh, refId);
dengyihao's avatar
dengyihao 已提交
1087

dengyihao's avatar
dengyihao 已提交
1088 1089 1090
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);

dengyihao's avatar
dengyihao 已提交
1091
  STransMsg tmsg = {.code = 0, .handle = exh, .ahandle = NULL, .refId = refId};
dengyihao's avatar
dengyihao 已提交
1092

wafwerar's avatar
wafwerar 已提交
1093
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1094 1095 1096
  srvMsg->msg = tmsg;
  srvMsg->type = Release;

dengyihao's avatar
dengyihao 已提交
1097
  tTrace("server conn %p start to release", exh->handle);
dengyihao's avatar
dengyihao 已提交
1098
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
1099 1100
  uvReleaseExHandle(refId);
  return;
dengyihao's avatar
dengyihao 已提交
1101
_return1:
dengyihao's avatar
dengyihao 已提交
1102
  tTrace("server handle %p failed to send to release handle", exh);
dengyihao's avatar
dengyihao 已提交
1103
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
1104 1105
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1106
  tTrace("server handle %p failed to send to release handle", exh);
dengyihao's avatar
dengyihao 已提交
1107
  return;
dengyihao's avatar
dengyihao 已提交
1108
}
dengyihao's avatar
dengyihao 已提交
1109 1110 1111 1112
void transSendResponse(const STransMsg* msg) {
  SExHandle* exh = msg->handle;
  int64_t    refId = msg->refId;
  ASYNC_CHECK_HANDLE(exh, refId);
dengyihao's avatar
dengyihao 已提交
1113
  assert(refId != 0);
dengyihao's avatar
dengyihao 已提交
1114

dengyihao's avatar
dengyihao 已提交
1115 1116 1117
  STransMsg tmsg = *msg;
  tmsg.refId = refId;

dengyihao's avatar
dengyihao 已提交
1118 1119
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);
dengyihao's avatar
dengyihao 已提交
1120

wafwerar's avatar
wafwerar 已提交
1121
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1122
  srvMsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
1123
  srvMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
1124
  tTrace("server conn %p start to send resp (1/2)", exh->handle);
dengyihao's avatar
dengyihao 已提交
1125
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
1126 1127
  uvReleaseExHandle(refId);
  return;
dengyihao's avatar
dengyihao 已提交
1128
_return1:
dengyihao's avatar
dengyihao 已提交
1129
  tTrace("server handle %p failed to send resp", exh);
dengyihao's avatar
dengyihao 已提交
1130
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1131
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
1132 1133
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1134
  tTrace("server handle %p failed to send resp", exh);
dengyihao's avatar
dengyihao 已提交
1135 1136
  rpcFreeCont(msg->pCont);
  return;
dengyihao's avatar
dengyihao 已提交
1137
}
dengyihao's avatar
dengyihao 已提交
1138
void transRegisterMsg(const STransMsg* msg) {
dengyihao's avatar
dengyihao 已提交
1139
  SExHandle* exh = msg->handle;
dengyihao's avatar
dengyihao 已提交
1140 1141 1142
  int64_t    refId = msg->refId;
  ASYNC_CHECK_HANDLE(exh, refId);

dengyihao's avatar
dengyihao 已提交
1143 1144 1145
  STransMsg tmsg = *msg;
  tmsg.refId = refId;

dengyihao's avatar
dengyihao 已提交
1146 1147
  SWorkThrdObj* pThrd = exh->pThrd;
  ASYNC_ERR_JRET(pThrd);
dengyihao's avatar
dengyihao 已提交
1148

wafwerar's avatar
wafwerar 已提交
1149
  SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
dengyihao's avatar
dengyihao 已提交
1150
  srvMsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
1151
  srvMsg->type = Register;
dengyihao's avatar
dengyihao 已提交
1152
  tTrace("server conn %p start to register brokenlink callback", exh->handle);
dengyihao's avatar
dengyihao 已提交
1153
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
1154 1155
  uvReleaseExHandle(refId);
  return;
dengyihao's avatar
dengyihao 已提交
1156

dengyihao's avatar
dengyihao 已提交
1157
_return1:
dengyihao's avatar
dengyihao 已提交
1158
  tTrace("server handle %p failed to send to register brokenlink", exh);
dengyihao's avatar
dengyihao 已提交
1159
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1160
  uvReleaseExHandle(refId);
dengyihao's avatar
dengyihao 已提交
1161 1162
  return;
_return2:
dengyihao's avatar
dengyihao 已提交
1163
  tTrace("server handle %p failed to send to register brokenlink", exh);
dengyihao's avatar
dengyihao 已提交
1164
  rpcFreeCont(msg->pCont);
dengyihao's avatar
dengyihao 已提交
1165
}
dengyihao's avatar
dengyihao 已提交
1166

dengyihao's avatar
formate  
dengyihao 已提交
1167
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
dengyihao's avatar
dengyihao 已提交
1168 1169 1170 1171 1172 1173
  if (thandle == NULL) {
    tTrace("invalid handle %p, failed to Get Conn info", thandle);
    return -1;
  }
  SExHandle* ex = thandle;
  SSrvConn*  pConn = ex->handle;
U
ubuntu 已提交
1174

dengyihao's avatar
dengyihao 已提交
1175
  struct sockaddr_in addr = pConn->addr;
dengyihao's avatar
dengyihao 已提交
1176 1177
  pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(addr.sin_port);
dengyihao's avatar
dengyihao 已提交
1178 1179 1180 1181
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
1182
#endif