transSrv.c 28.3 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20 21 22 23 24 25
typedef struct {
  int       notifyCount;  //
  int       init;         // init or not
  STransMsg msg;
} SSrvRegArg;

dengyihao's avatar
dengyihao 已提交
26
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
27
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
28 29 30
  uv_tcp_t*  pTcp;
  uv_write_t pWriter;
  uv_timer_t pTimer;
dengyihao's avatar
dengyihao 已提交
31 32 33

  queue       queue;
  int         ref;
dengyihao's avatar
dengyihao 已提交
34
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
35
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
36 37 38
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
39
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
40
  SArray*     srvMsgs;
dengyihao's avatar
dengyihao 已提交
41

dengyihao's avatar
dengyihao 已提交
42 43
  SSrvRegArg regArg;
  bool       broken;  // conn broken;
dengyihao's avatar
dengyihao 已提交
44

dengyihao's avatar
dengyihao 已提交
45
  ConnStatus         status;
dengyihao's avatar
dengyihao 已提交
46
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
47
  struct sockaddr_in locaddr;
U
ubuntu 已提交
48

dengyihao's avatar
dengyihao 已提交
49 50 51 52 53 54
  char secured;
  int  spi;
  char info[64];
  char user[TSDB_UNI_LEN];  // user ID for the link
  char secret[TSDB_PASSWORD_LEN];
  char ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
55 56 57
} SSrvConn;

typedef struct SSrvMsg {
dengyihao's avatar
dengyihao 已提交
58 59 60 61
  SSrvConn*     pConn;
  STransMsg     msg;
  queue         q;
  STransMsgType type;
dengyihao's avatar
dengyihao 已提交
62
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
63 64

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
65 66 67 68 69
  pthread_t       thread;
  uv_pipe_t*      pipe;
  uv_os_fd_t      fd;
  uv_loop_t*      loop;
  SAsyncPool*     asyncPool;
dengyihao's avatar
dengyihao 已提交
70 71
  queue           msg;
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
72 73 74 75

  queue conn;
  void* pTransInst;
  bool  quit;
dengyihao's avatar
dengyihao 已提交
76 77 78
} SWorkThrdObj;

typedef struct SServerObj {
dengyihao's avatar
dengyihao 已提交
79 80 81 82 83
  pthread_t  thread;
  uv_tcp_t   server;
  uv_loop_t* loop;

  // work thread info
dengyihao's avatar
dengyihao 已提交
84 85 86
  int            workerIdx;
  int            numOfThreads;
  SWorkThrdObj** pThreadObj;
dengyihao's avatar
dengyihao 已提交
87 88 89 90 91

  uv_pipe_t** pipe;
  uint32_t    ip;
  uint32_t    port;
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
92 93 94 95
} SServerObj;

static const char* notify = "a";

dengyihao's avatar
dengyihao 已提交
96 97 98 99 100 101 102 103 104
#define CONN_SHOULD_RELEASE(conn, head)                            \
  do {                                                             \
    if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
      conn->status = ConnRelease;                                  \
      transClearBuffer(&conn->readBuf);                            \
      transFreeMsg(transContFromHead((char*)head));                \
      goto _RETURE;                                                \
    }                                                              \
  } while (0)
dengyihao's avatar
dengyihao 已提交
105
// refactor later
dengyihao's avatar
dengyihao 已提交
106
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
107

dengyihao's avatar
dengyihao 已提交
108
static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
109 110

static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
111
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
112
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
113
static void uvOnTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
114
static void uvOnSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
115
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
116 117 118
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
119
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
120
static void uvShutDownCb(uv_shutdown_t* req, int status);
dengyihao's avatar
dengyihao 已提交
121

dengyihao's avatar
dengyihao 已提交
122
static void uvStartSendRespInternal(SSrvMsg* smsg);
dengyihao's avatar
dengyihao 已提交
123 124
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
125

dengyihao's avatar
dengyihao 已提交
126 127
static void uvNotifyLinkBrokenToApp(SSrvConn* conn);

dengyihao's avatar
dengyihao 已提交
128
static void destroySmsg(SSrvMsg* smsg);
129
// check whether already read complete packet
dengyihao's avatar
fix bug  
dengyihao 已提交
130
static SSrvConn* createConn(void* hThrd);
dengyihao's avatar
dengyihao 已提交
131
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
132

dengyihao's avatar
dengyihao 已提交
133 134 135
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd);
dengyihao's avatar
dengyihao 已提交
136 137 138
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease,
                                                                       uvHandleRegister};
dengyihao's avatar
dengyihao 已提交
139

140
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
141

dengyihao's avatar
dengyihao 已提交
142
// server and worker thread
dengyihao's avatar
dengyihao 已提交
143 144 145
static void* workerThread(void* arg);
static void* acceptThread(void* arg);

dengyihao's avatar
dengyihao 已提交
146 147 148 149
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
150
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
151 152 153
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
154 155
}

dengyihao's avatar
dengyihao 已提交
156
static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
dengyihao's avatar
dengyihao 已提交
157 158 159
  STransMsgHead* pHead = (STransMsgHead*)msg;

  int code = 0;
dengyihao's avatar
dengyihao 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191

  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
    // secured link, or no authentication
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
    // tTrace("%s, secured link, no auth is required", pConn->info);
    return 0;
  }

  if (!rpcIsReq(pHead->msgType)) {
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
        code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
        code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
      return 0;
    }
  }

  code = 0;
  if (pHead->spi == pConn->spi) {
    // authentication
    SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest));

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
    } else {
dengyihao's avatar
dengyihao 已提交
192
      if (transAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
dengyihao's avatar
dengyihao 已提交
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
        // tDebug("%s, authentication failed, msg discarded", pConn->info);
        code = TSDB_CODE_RPC_AUTH_FAILURE;
      } else {
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
        if (!rpcIsReq(pHead->msgType)) pConn->secured = 1;  // link is secured for client
        // tTrace("%s, message is authenticated", pConn->info);
      }
    }
  } else {
    tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
  }

  return code;
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
211
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
212
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
213 214
}

dengyihao's avatar
dengyihao 已提交
215
static void uvHandleReq(SSrvConn* pConn) {
dengyihao's avatar
dengyihao 已提交
216 217
  SRecvInfo    info;
  SRecvInfo*   p = &info;
dengyihao's avatar
dengyihao 已提交
218
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
219
  p->msg = pBuf->buf;
dengyihao's avatar
dengyihao 已提交
220 221 222
  p->msgLen = pBuf->len;
  p->ip = 0;
  p->port = 0;
dengyihao's avatar
dengyihao 已提交
223
  p->shandle = pConn->pTransInst;  //
dengyihao's avatar
dengyihao 已提交
224 225 226
  p->thandle = pConn;
  p->chandle = NULL;

dengyihao's avatar
dengyihao 已提交
227
  STransMsgHead* pHead = (STransMsgHead*)p->msg;
dengyihao's avatar
dengyihao 已提交
228
  if (pHead->secured == 1) {
229
    STransUserMsg* uMsg = (STransUserMsg*)((char*)p->msg + p->msgLen - sizeof(STransUserMsg));
dengyihao's avatar
dengyihao 已提交
230
    memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
dengyihao's avatar
dengyihao 已提交
231
    memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
dengyihao's avatar
dengyihao 已提交
232
  }
dengyihao's avatar
dengyihao 已提交
233 234
  pHead->code = htonl(pHead->code);

dengyihao's avatar
dengyihao 已提交
235 236 237
  int32_t dlen = 0;
  if (transDecompressMsg(NULL, 0, NULL)) {
    // add compress later
U
ubuntu 已提交
238
    // pHead = rpcDecompresSTransMsg(pHead);
dengyihao's avatar
dengyihao 已提交
239
  } else {
dengyihao's avatar
dengyihao 已提交
240
    pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
241 242 243
    if (pHead->secured == 1) {
      pHead->msgLen -= sizeof(STransUserMsg);
    }
dengyihao's avatar
dengyihao 已提交
244
  }
dengyihao's avatar
dengyihao 已提交
245
  CONN_SHOULD_RELEASE(pConn, pHead);
dengyihao's avatar
dengyihao 已提交
246

dengyihao's avatar
dengyihao 已提交
247 248 249 250 251 252
  STransMsg transMsg;
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = pHead->content;
  transMsg.msgType = pHead->msgType;
  transMsg.code = pHead->code;
  transMsg.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
253
  transMsg.handle = NULL;
dengyihao's avatar
dengyihao 已提交
254

dengyihao's avatar
dengyihao 已提交
255
  transClearBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
256
  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
257 258 259 260 261 262 263
  if (pConn->status == ConnNormal) {
    if (pHead->persist == 1) {
      pConn->status = ConnAcquire;
      transRefSrvHandle(pConn);
    }
  }
  if (pConn->status == ConnNormal && pHead->noResp == 0) {
dengyihao's avatar
dengyihao 已提交
264 265
    transRefSrvHandle(pConn);
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
266
           taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
dengyihao's avatar
dengyihao 已提交
267 268
           ntohs(pConn->locaddr.sin_port), transMsg.contLen);
  } else {
dengyihao's avatar
dengyihao 已提交
269
    tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
dengyihao's avatar
dengyihao 已提交
270 271
           TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
           taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
dengyihao's avatar
dengyihao 已提交
272 273 274 275 276
    // no ref here
  }

  if (pHead->noResp == 0) {
    transMsg.handle = pConn;
dengyihao's avatar
dengyihao 已提交
277
  }
dengyihao's avatar
dengyihao 已提交
278 279

  STrans* pTransInst = (STrans*)p->shandle;
dengyihao's avatar
dengyihao 已提交
280
  (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
dengyihao's avatar
dengyihao 已提交
281
  // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
282
  // auth
dengyihao's avatar
dengyihao 已提交
283 284
_RETURE:
  return;
dengyihao's avatar
dengyihao 已提交
285 286
}

dengyihao's avatar
dengyihao 已提交
287
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
288
  // opt
dengyihao's avatar
dengyihao 已提交
289 290
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
291 292
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
293
    tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
294
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
295
      tTrace("server conn %p alread read complete packet", conn);
dengyihao's avatar
dengyihao 已提交
296
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
297
    } else {
dengyihao's avatar
dengyihao 已提交
298
      tTrace("server %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
299 300 301
    }
    return;
  }
302 303 304
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
305 306

  tError("server conn %p read error: %s", conn, uv_err_name(nread));
U
ubuntu 已提交
307
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
308
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
309 310 311 312 313 314 315
    if (conn->status == ConnAcquire) {
      if (conn->regArg.init) {
        STrans* pTransInst = conn->pTransInst;
        (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
        memset(&conn->regArg, 0, sizeof(conn->regArg));
      }
    }
dengyihao's avatar
dengyihao 已提交
316
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
317 318 319 320
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->len = 2;
dengyihao's avatar
fix bug  
dengyihao 已提交
321
  buf->base = calloc(1, sizeof(char) * buf->len);
dengyihao's avatar
dengyihao 已提交
322 323 324 325
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
326
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
327
  tError("server conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
328 329
}

dengyihao's avatar
dengyihao 已提交
330
void uvOnSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
331 332
  SSrvConn* conn = req->data;
  transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
333
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
334
    tTrace("server conn %p data already was written on stream", conn);
dengyihao's avatar
add UT  
dengyihao 已提交
335 336 337
    if (conn->srvMsgs != NULL) {
      assert(taosArrayGetSize(conn->srvMsgs) >= 1);
      SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
338
      tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
dengyihao's avatar
add UT  
dengyihao 已提交
339
      taosArrayRemove(conn->srvMsgs, 0);
dengyihao's avatar
dengyihao 已提交
340 341 342
      if (msg->type == Release && conn->status != ConnNormal) {
        conn->status = ConnNormal;
        transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
343 344 345 346 347 348 349 350 351 352 353
      } else if (msg->type == Register && conn->status == ConnAcquire) {
        conn->regArg.notifyCount = 0;
        conn->regArg.init = 1;
        conn->regArg.msg = msg->msg;
        if (conn->broken) {
          STrans* pTransInst = conn->pTransInst;
          (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
          memset(&conn->regArg, 0, sizeof(conn->regArg));
        }
        free(msg);
        return;
dengyihao's avatar
dengyihao 已提交
354
      }
dengyihao's avatar
add UT  
dengyihao 已提交
355 356 357
      destroySmsg(msg);
      // send second data, just use for push
      if (taosArrayGetSize(conn->srvMsgs) > 0) {
dengyihao's avatar
dengyihao 已提交
358
        tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
dengyihao's avatar
add UT  
dengyihao 已提交
359 360 361
        msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
        uvStartSendRespInternal(msg);
      }
dengyihao's avatar
dengyihao 已提交
362
    }
dengyihao's avatar
dengyihao 已提交
363
  } else {
dengyihao's avatar
dengyihao 已提交
364
    tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
365
    conn->broken = true;
dengyihao's avatar
dengyihao 已提交
366
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
367 368
  }
}
dengyihao's avatar
dengyihao 已提交
369 370
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
371
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
372 373 374
  } else {
    tError("fail to dispatch conn to work thread");
  }
dengyihao's avatar
dengyihao 已提交
375
  free(req);
dengyihao's avatar
dengyihao 已提交
376
}
dengyihao's avatar
dengyihao 已提交
377

dengyihao's avatar
dengyihao 已提交
378
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
379
  // impl later;
dengyihao's avatar
dengyihao 已提交
380
  tTrace("server conn %p prepare to send resp", smsg->pConn);
dengyihao's avatar
dengyihao 已提交
381

dengyihao's avatar
formate  
dengyihao 已提交
382 383
  SSrvConn*  pConn = smsg->pConn;
  STransMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
384 385 386 387 388
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
389 390

  pHead->secured = pMsg->code == 0 ? 1 : 0;  //
dengyihao's avatar
dengyihao 已提交
391
  pHead->msgType = smsg->pConn->inType + 1;
dengyihao's avatar
dengyihao 已提交
392
  pHead->release = smsg->type == Release ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
393
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
394 395 396 397 398 399
  // add more info
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
  if (transCompressMsg(msg, len, NULL)) {
    // impl later
  }
dengyihao's avatar
dengyihao 已提交
400
  tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
401
         taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
dengyihao's avatar
dengyihao 已提交
402
         ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
403

dengyihao's avatar
dengyihao 已提交
404 405 406 407
  pHead->msgLen = htonl(len);
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
408 409

static void uvStartSendRespInternal(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
410 411 412 413
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
414
  uv_timer_stop(&pConn->pTimer);
dengyihao's avatar
dengyihao 已提交
415
  uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
dengyihao's avatar
dengyihao 已提交
416 417 418 419
}
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
420 421

  if (pConn->broken == true) {
dengyihao's avatar
dengyihao 已提交
422
    // persist by
dengyihao's avatar
dengyihao 已提交
423 424 425
    transUnrefSrvHandle(pConn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
426 427 428
  if (pConn->status == ConnNormal) {
    transUnrefSrvHandle(pConn);
  }
dengyihao's avatar
dengyihao 已提交
429

dengyihao's avatar
dengyihao 已提交
430
  if (taosArrayGetSize(pConn->srvMsgs) > 0) {
dengyihao's avatar
dengyihao 已提交
431 432
    tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr),
           ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
433 434 435 436 437
    taosArrayPush(pConn->srvMsgs, &smsg);
    return;
  }
  taosArrayPush(pConn->srvMsgs, &smsg);
  uvStartSendRespInternal(smsg);
dengyihao's avatar
dengyihao 已提交
438 439
  return;
}
dengyihao's avatar
dengyihao 已提交
440

dengyihao's avatar
dengyihao 已提交
441
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
442 443 444 445
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
dengyihao's avatar
dengyihao 已提交
446
  free(smsg);
dengyihao's avatar
dengyihao 已提交
447
}
dengyihao's avatar
fix bug  
dengyihao 已提交
448 449 450 451 452 453 454
static void destroyAllConn(SWorkThrdObj* pThrd) {
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
    queue* h = QUEUE_HEAD(&pThrd->conn);
    QUEUE_REMOVE(h);
    QUEUE_INIT(h);

    SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
dengyihao's avatar
dengyihao 已提交
455 456 457
    while (T_REF_VAL_GET(c) >= 2) {
      transUnrefSrvHandle(c);
    }
dengyihao's avatar
dengyihao 已提交
458
    transUnrefSrvHandle(c);
dengyihao's avatar
fix bug  
dengyihao 已提交
459 460
  }
}
dengyihao's avatar
dengyihao 已提交
461
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
462 463
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
464
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
465
  queue         wq;
dengyihao's avatar
dengyihao 已提交
466

dengyihao's avatar
dengyihao 已提交
467
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
dengyihao 已提交
468 469 470
  pthread_mutex_lock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  pthread_mutex_unlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
471 472 473 474

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
475 476 477

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
478
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
479
      continue;
dengyihao's avatar
dengyihao 已提交
480
    }
dengyihao's avatar
dengyihao 已提交
481
    (*transAsyncHandle[msg->type])(msg, pThrd);
dengyihao's avatar
dengyihao 已提交
482 483
  }
}
dengyihao's avatar
dengyihao 已提交
484 485
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
dengyihao's avatar
dengyihao 已提交
486
  tDebug("close server port %d", srv->port);
dengyihao's avatar
fix bug  
dengyihao 已提交
487
  uv_close((uv_handle_t*)&srv->server, NULL);
dengyihao's avatar
dengyihao 已提交
488 489
  uv_stop(srv->loop);
}
dengyihao's avatar
dengyihao 已提交
490

dengyihao's avatar
dengyihao 已提交
491
static void uvShutDownCb(uv_shutdown_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
492 493 494
  if (status != 0) {
    tDebug("conn failed to shut down: %s", uv_err_name(status));
  }
dengyihao's avatar
dengyihao 已提交
495 496 497 498
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
  free(req);
}

dengyihao's avatar
dengyihao 已提交
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513
void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

  uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
    uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));

    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
514

dengyihao's avatar
dengyihao 已提交
515
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
516
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
517 518
  } else {
    uv_close((uv_handle_t*)cli, NULL);
519
    free(cli);
dengyihao's avatar
dengyihao 已提交
520 521 522
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
523
  tTrace("server connection coming");
dengyihao's avatar
dengyihao 已提交
524 525 526 527 528
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
529
    // uv_close((uv_handle_t*)q, NULL);
dengyihao's avatar
dengyihao 已提交
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
  free(buf->base);

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
fix bug  
dengyihao 已提交
548
  SSrvConn* pConn = createConn(pThrd);
549

dengyihao's avatar
dengyihao 已提交
550
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
551
  /* init conn timer*/
dengyihao's avatar
dengyihao 已提交
552 553
  uv_timer_init(pThrd->loop, &pConn->pTimer);
  pConn->pTimer.data = pConn;
dengyihao's avatar
dengyihao 已提交
554 555 556 557 558 559 560 561

  pConn->hostThrd = pThrd;

  // init client handle
  pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

dengyihao's avatar
dengyihao 已提交
562
  pConn->pWriter.data = pConn;
dengyihao's avatar
dengyihao 已提交
563

dengyihao's avatar
dengyihao 已提交
564 565
  transSetConnOption((uv_tcp_t*)pConn->pTcp);

dengyihao's avatar
dengyihao 已提交
566 567 568
  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
569
    tTrace("server conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
570

dengyihao's avatar
dengyihao 已提交
571 572
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
573
      tError("server conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
574
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
575
      return;
dengyihao's avatar
dengyihao 已提交
576
    }
dengyihao's avatar
dengyihao 已提交
577 578 579 580

    addrlen = sizeof(pConn->locaddr);
    if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
      tError("server conn %p failed to get local info", pConn);
dengyihao's avatar
dengyihao 已提交
581
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
582 583 584
      return;
    }

dengyihao's avatar
dengyihao 已提交
585
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
dengyihao's avatar
dengyihao 已提交
586

dengyihao's avatar
dengyihao 已提交
587
  } else {
dengyihao's avatar
dengyihao 已提交
588
    tDebug("failed to create new connection");
dengyihao's avatar
dengyihao 已提交
589
    transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
590 591 592 593 594
  }
}

void* acceptThread(void* arg) {
  // opt
dengyihao's avatar
dengyihao 已提交
595
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
596 597
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
598 599

  return NULL;
dengyihao's avatar
dengyihao 已提交
600
}
dengyihao's avatar
dengyihao 已提交
601 602
static bool addHandleToWorkloop(void* arg) {
  SWorkThrdObj* pThrd = arg;
dengyihao's avatar
dengyihao 已提交
603
  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
604 605 606
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
607

dengyihao's avatar
dengyihao 已提交
608
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
dengyihao's avatar
dengyihao 已提交
609 610 611 612
  uv_pipe_open(pThrd->pipe, pThrd->fd);

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
613 614
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
615

dengyihao's avatar
fix bug  
dengyihao 已提交
616 617 618
  // conn set
  QUEUE_INIT(&pThrd->conn);

dengyihao's avatar
dengyihao 已提交
619
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb);
dengyihao's avatar
dengyihao 已提交
620
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
621 622 623 624 625 626 627 628 629 630 631 632 633
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
634 635 636 637
  // register an async here to quit server gracefully
  srv->pAcceptAsync = calloc(1, sizeof(uv_async_t));
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
638

dengyihao's avatar
dengyihao 已提交
639
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
640 641 642 643 644
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
645
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
646 647 648 649
    tError("failed to listen: %s", uv_err_name(err));
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
650 651
}
void* workerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
652
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
653
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
654
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
655 656

  return NULL;
dengyihao's avatar
dengyihao 已提交
657 658
}

dengyihao's avatar
fix bug  
dengyihao 已提交
659 660 661
static SSrvConn* createConn(void* hThrd) {
  SWorkThrdObj* pThrd = hThrd;

dengyihao's avatar
dengyihao 已提交
662
  SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
dengyihao's avatar
fix bug  
dengyihao 已提交
663 664 665
  QUEUE_INIT(&pConn->queue);

  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
dengyihao's avatar
dengyihao 已提交
666
  pConn->srvMsgs = taosArrayInit(2, sizeof(void*));  //
dengyihao's avatar
dengyihao 已提交
667
  tTrace("conn %p created", pConn);
dengyihao's avatar
dengyihao 已提交
668

dengyihao's avatar
dengyihao 已提交
669
  memset(&pConn->regArg, 0, sizeof(pConn->regArg));
dengyihao's avatar
dengyihao 已提交
670
  pConn->broken = false;
dengyihao's avatar
dengyihao 已提交
671
  pConn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
672

dengyihao's avatar
dengyihao 已提交
673
  transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
674 675
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
676

dengyihao's avatar
dengyihao 已提交
677
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
678 679 680
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
681
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
682 683 684 685 686

  for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) {
    SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i);
    destroySmsg(msg);
  }
dengyihao's avatar
add UT  
dengyihao 已提交
687
  conn->srvMsgs = taosArrayDestroy(conn->srvMsgs);
688
  if (clear) {
dengyihao's avatar
dengyihao 已提交
689
    tTrace("try to destroy conn %p", conn);
dengyihao's avatar
dengyihao 已提交
690 691
    uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
    uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
692
  }
dengyihao's avatar
dengyihao 已提交
693 694
}
static void uvDestroyConn(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
695 696 697 698
  SSrvConn* conn = handle->data;
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
699 700
  SWorkThrdObj* thrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
701
  tDebug("server conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
702
  uv_timer_stop(&conn->pTimer);
dengyihao's avatar
dengyihao 已提交
703
  QUEUE_REMOVE(&conn->queue);
dengyihao's avatar
dengyihao 已提交
704
  free(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
705
  // free(conn);
dengyihao's avatar
dengyihao 已提交
706

dengyihao's avatar
dengyihao 已提交
707 708
  if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
    uv_loop_close(thrd->loop);
dengyihao's avatar
dengyihao 已提交
709 710
    uv_stop(thrd->loop);
  }
dengyihao's avatar
dengyihao 已提交
711
}
dengyihao's avatar
dengyihao 已提交
712
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
dengyihao's avatar
dengyihao 已提交
713
  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
714 715 716 717

  if (pConn->spi && pConn->secured == 0) {
    // add auth part
    pHead->spi = pConn->spi;
dengyihao's avatar
dengyihao 已提交
718
    STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen);
dengyihao's avatar
dengyihao 已提交
719 720 721
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
dengyihao's avatar
dengyihao 已提交
722 723
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
dengyihao's avatar
dengyihao 已提交
724 725 726 727 728 729 730 731
  } else {
    pHead->spi = 0;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
  }

  return msgLen;
}

U
ubuntu 已提交
732
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
dengyihao's avatar
dengyihao 已提交
733 734 735 736 737 738 739 740 741 742 743 744
  SServerObj* srv = calloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
  srv->pThreadObj = (SWorkThrdObj**)calloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*));
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

  for (int i = 0; i < srv->numOfThreads; i++) {
    SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
745
    thrd->quit = false;
dengyihao's avatar
dengyihao 已提交
746
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
747

dengyihao's avatar
dengyihao 已提交
748 749 750
    srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
    int fds[2];
    if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
dengyihao's avatar
dengyihao 已提交
751
      goto End;
dengyihao's avatar
dengyihao 已提交
752 753 754 755
    }
    uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

dengyihao's avatar
dengyihao 已提交
756
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
757 758
    thrd->fd = fds[0];
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
759

dengyihao's avatar
dengyihao 已提交
760 761 762
    if (false == addHandleToWorkloop(thrd)) {
      goto End;
    }
dengyihao's avatar
dengyihao 已提交
763 764 765 766 767 768 769 770 771
    int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
    }
  }
dengyihao's avatar
dengyihao 已提交
772 773 774
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
dengyihao's avatar
dengyihao 已提交
775 776 777 778 779 780 781 782
  int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
    // clear all resource later
  }

  return srv;
dengyihao's avatar
dengyihao 已提交
783
End:
U
ubuntu 已提交
784
  transCloseServer(srv);
dengyihao's avatar
dengyihao 已提交
785 786
  return NULL;
}
dengyihao's avatar
dengyihao 已提交
787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
  if (QUEUE_IS_EMPTY(&thrd->conn)) {
    uv_loop_close(thrd->loop);
    uv_stop(thrd->loop);
  } else {
    destroyAllConn(thrd);
    thrd->quit = true;
  }
  free(msg);
}
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
  // release handle to rpc init
  SSrvConn* conn = msg->pConn;
  if (conn->status == ConnAcquire) {
    if (taosArrayGetSize(conn->srvMsgs) > 0) {
      taosArrayPush(conn->srvMsgs, &msg);
dengyihao's avatar
dengyihao 已提交
803
      return;
dengyihao's avatar
dengyihao 已提交
804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
    }
    taosArrayPush(conn->srvMsgs, &msg);
    uvStartSendRespInternal(msg);
    return;
  } else if (conn->status == ConnRelease) {
    // already release by server app, do nothing
  } else if (conn->status == ConnNormal) {
    // no nothing
    // user should not call this rpcRelease handle;
  }
  free(msg);
}
void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
  // send msg to client
  uvStartSendResp(msg);
}
dengyihao's avatar
dengyihao 已提交
820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
  SSrvConn* conn = msg->pConn;
  if (conn->status == ConnAcquire) {
    if (taosArrayGetSize(conn->srvMsgs) > 0) {
      taosArrayPush(conn->srvMsgs, &msg);
      return;
    }
    conn->regArg.notifyCount = 0;
    conn->regArg.init = 1;
    conn->regArg.msg = msg->msg;

    if (conn->broken) {
      STrans* pTransInst = conn->pTransInst;
      (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
      memset(&conn->regArg, 0, sizeof(conn->regArg));
    }
    free(msg);
  }
}
dengyihao's avatar
dengyihao 已提交
839 840 841 842 843 844
void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
  pthread_join(pThrd->thread, NULL);
  free(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
845
  transDestroyAsyncPool(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
846
  free(pThrd);
dengyihao's avatar
dengyihao 已提交
847
}
dengyihao's avatar
dengyihao 已提交
848
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
849 850
  SSrvMsg* msg = calloc(1, sizeof(SSrvMsg));
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
851
  tDebug("server send quit msg to work thread");
dengyihao's avatar
dengyihao 已提交
852
  transSendAsync(pThrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
853 854
}

U
ubuntu 已提交
855
void transCloseServer(void* arg) {
dengyihao's avatar
dengyihao 已提交
856 857
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
858
  for (int i = 0; i < srv->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
859
    sendQuitToWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
860
    destroyWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
861
  }
dengyihao's avatar
dengyihao 已提交
862 863 864 865 866 867 868

  tDebug("send quit msg to accept thread");
  uv_async_send(srv->pAcceptAsync);
  pthread_join(srv->thread, NULL);

  free(srv->pThreadObj);
  free(srv->pAcceptAsync);
dengyihao's avatar
dengyihao 已提交
869
  free(srv->loop);
dengyihao's avatar
dengyihao 已提交
870 871 872 873

  for (int i = 0; i < srv->numOfThreads; i++) {
    free(srv->pipe[i]);
  }
dengyihao's avatar
dengyihao 已提交
874
  free(srv->pipe);
dengyihao's avatar
dengyihao 已提交
875

dengyihao's avatar
dengyihao 已提交
876
  free(srv);
dengyihao's avatar
dengyihao 已提交
877
}
dengyihao's avatar
dengyihao 已提交
878

dengyihao's avatar
dengyihao 已提交
879 880 881 882 883 884 885 886 887 888 889 890 891 892 893
void transRefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  SSrvConn* conn = handle;

  int ref = T_REF_INC((SSrvConn*)handle);
  UNUSED(ref);
}

void transUnrefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
894
  tDebug("handle %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
895 896 897 898 899 900

  if (ref == 0) {
    destroyConn((SSrvConn*)handle, true);
  }
  // unref srv handle
}
dengyihao's avatar
dengyihao 已提交
901 902

void transReleaseSrvHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
903 904 905 906 907 908 909 910 911 912 913 914 915 916 917
  if (handle == NULL) {
    return;
  }
  SSrvConn*     pConn = handle;
  SWorkThrdObj* pThrd = pConn->hostThrd;

  STransMsg tmsg = {.handle = handle, .code = 0};

  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  srvMsg->msg = tmsg;
  srvMsg->type = Release;
  srvMsg->pConn = pConn;

  tTrace("server conn %p start to release", pConn);
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
918
}
U
ubuntu 已提交
919
void transSendResponse(const STransMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
920 921 922
  if (pMsg->handle == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
923
  SSrvConn*     pConn = pMsg->handle;
dengyihao's avatar
dengyihao 已提交
924 925
  SWorkThrdObj* pThrd = pConn->hostThrd;

dengyihao's avatar
dengyihao 已提交
926 927 928
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  srvMsg->pConn = pConn;
  srvMsg->msg = *pMsg;
dengyihao's avatar
dengyihao 已提交
929
  srvMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
930
  tTrace("server conn %p start to send resp", pConn);
dengyihao's avatar
dengyihao 已提交
931
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
932
}
dengyihao's avatar
dengyihao 已提交
933 934 935 936 937 938 939 940 941 942 943 944 945 946
void transRegisterMsg(const STransMsg* msg) {
  if (msg->handle == NULL) {
    return;
  }
  SSrvConn*     pConn = msg->handle;
  SWorkThrdObj* pThrd = pConn->hostThrd;

  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  srvMsg->pConn = pConn;
  srvMsg->msg = *msg;
  srvMsg->type = Register;
  tTrace("server conn %p start to send resp", pConn);
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
}
dengyihao's avatar
formate  
dengyihao 已提交
947 948
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
  SSrvConn*          pConn = thandle;
dengyihao's avatar
dengyihao 已提交
949
  struct sockaddr_in addr = pConn->addr;
U
ubuntu 已提交
950

dengyihao's avatar
dengyihao 已提交
951 952
  pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(addr.sin_port);
dengyihao's avatar
dengyihao 已提交
953 954 955 956
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
957
#endif