transSrv.c 23.3 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV

18
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20
typedef struct SSrvConn {
dengyihao's avatar
dengyihao 已提交
21
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
22 23 24
  uv_tcp_t*  pTcp;
  uv_write_t pWriter;
  uv_timer_t pTimer;
dengyihao's avatar
dengyihao 已提交
25 26 27

  queue       queue;
  int         ref;
dengyihao's avatar
dengyihao 已提交
28
  int         persist;  // persist connection or not
dengyihao's avatar
dengyihao 已提交
29
  SConnBuffer readBuf;  // read buf,
dengyihao's avatar
dengyihao 已提交
30 31 32
  int         inType;
  void*       pTransInst;  // rpc init
  void*       ahandle;     //
dengyihao's avatar
dengyihao 已提交
33
  void*       hostThrd;
dengyihao's avatar
dengyihao 已提交
34
  SArray*     srvMsgs;
dengyihao's avatar
dengyihao 已提交
35 36

  bool broken;  // conn broken;
dengyihao's avatar
dengyihao 已提交
37

dengyihao's avatar
dengyihao 已提交
38
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
39
  struct sockaddr_in locaddr;
U
ubuntu 已提交
40

dengyihao's avatar
dengyihao 已提交
41 42 43 44 45 46
  char secured;
  int  spi;
  char info[64];
  char user[TSDB_UNI_LEN];  // user ID for the link
  char secret[TSDB_PASSWORD_LEN];
  char ckey[TSDB_PASSWORD_LEN];  // ciphering key
dengyihao's avatar
dengyihao 已提交
47 48 49 50
} SSrvConn;

typedef struct SSrvMsg {
  SSrvConn* pConn;
dengyihao's avatar
formate  
dengyihao 已提交
51
  STransMsg msg;
dengyihao's avatar
dengyihao 已提交
52 53
  queue     q;
} SSrvMsg;
dengyihao's avatar
dengyihao 已提交
54 55

typedef struct SWorkThrdObj {
dengyihao's avatar
dengyihao 已提交
56 57
  pthread_t   thread;
  uv_pipe_t*  pipe;
58
  uv_os_fd_t  fd;
dengyihao's avatar
dengyihao 已提交
59 60 61
  uv_loop_t*  loop;
  SAsyncPool* asyncPool;
  // uv_async_t*     workerAsync;  //
dengyihao's avatar
dengyihao 已提交
62
  queue           msg;
dengyihao's avatar
fix bug  
dengyihao 已提交
63
  queue           conn;
dengyihao's avatar
dengyihao 已提交
64
  pthread_mutex_t msgMtx;
dengyihao's avatar
dengyihao 已提交
65
  void*           pTransInst;
dengyihao's avatar
dengyihao 已提交
66
  bool            quit;
dengyihao's avatar
dengyihao 已提交
67 68 69
} SWorkThrdObj;

typedef struct SServerObj {
dengyihao's avatar
dengyihao 已提交
70 71 72 73 74
  pthread_t  thread;
  uv_tcp_t   server;
  uv_loop_t* loop;

  // work thread info
dengyihao's avatar
dengyihao 已提交
75 76 77
  int            workerIdx;
  int            numOfThreads;
  SWorkThrdObj** pThreadObj;
dengyihao's avatar
dengyihao 已提交
78 79 80 81 82

  uv_pipe_t** pipe;
  uint32_t    ip;
  uint32_t    port;
  uv_async_t* pAcceptAsync;  // just to quit from from accept thread
dengyihao's avatar
dengyihao 已提交
83 84 85 86 87
} SServerObj;

static const char* notify = "a";

// refactor later
dengyihao's avatar
dengyihao 已提交
88
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
89

dengyihao's avatar
dengyihao 已提交
90
static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
dengyihao's avatar
dengyihao 已提交
91 92 93

static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
94
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
95
static void uvOnTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
96
static void uvOnSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
97
static void uvOnPipeWriteCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
98 99 100
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
101
static void uvAcceptAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
102
static void uvShutDownCb(uv_shutdown_t* req, int status);
dengyihao's avatar
dengyihao 已提交
103

dengyihao's avatar
dengyihao 已提交
104
static void uvStartSendRespInternal(SSrvMsg* smsg);
dengyihao's avatar
dengyihao 已提交
105 106
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
dengyihao's avatar
dengyihao 已提交
107

dengyihao's avatar
dengyihao 已提交
108
static void destroySmsg(SSrvMsg* smsg);
109
// check whether already read complete packet
dengyihao's avatar
fix bug  
dengyihao 已提交
110
static SSrvConn* createConn(void* hThrd);
dengyihao's avatar
dengyihao 已提交
111
static void      destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
dengyihao's avatar
dengyihao 已提交
112

113
static void uvDestroyConn(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
114

dengyihao's avatar
dengyihao 已提交
115
// server and worker thread
dengyihao's avatar
dengyihao 已提交
116 117 118
static void* workerThread(void* arg);
static void* acceptThread(void* arg);

dengyihao's avatar
dengyihao 已提交
119 120 121 122
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);

dengyihao's avatar
dengyihao 已提交
123
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
124 125 126
  SSrvConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
127 128
}

dengyihao's avatar
dengyihao 已提交
129
static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
dengyihao's avatar
dengyihao 已提交
130 131 132
  STransMsgHead* pHead = (STransMsgHead*)msg;

  int code = 0;
dengyihao's avatar
dengyihao 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183

  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
    // secured link, or no authentication
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
    // tTrace("%s, secured link, no auth is required", pConn->info);
    return 0;
  }

  if (!rpcIsReq(pHead->msgType)) {
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
        code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
        code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
      return 0;
    }
  }

  code = 0;
  if (pHead->spi == pConn->spi) {
    // authentication
    SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest));

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
    } else {
      if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
        // tDebug("%s, authentication failed, msg discarded", pConn->info);
        code = TSDB_CODE_RPC_AUTH_FAILURE;
      } else {
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
        if (!rpcIsReq(pHead->msgType)) pConn->secured = 1;  // link is secured for client
        // tTrace("%s, message is authenticated", pConn->info);
      }
    }
  } else {
    tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
  }

  return code;
}

// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
184
  SSrvConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
185
  tDebug("%p timeout since no activity", conn);
dengyihao's avatar
dengyihao 已提交
186 187
}

dengyihao's avatar
dengyihao 已提交
188
static void uvHandleReq(SSrvConn* pConn) {
dengyihao's avatar
dengyihao 已提交
189 190
  SRecvInfo    info;
  SRecvInfo*   p = &info;
dengyihao's avatar
dengyihao 已提交
191
  SConnBuffer* pBuf = &pConn->readBuf;
dengyihao's avatar
dengyihao 已提交
192
  p->msg = pBuf->buf;
dengyihao's avatar
dengyihao 已提交
193 194 195
  p->msgLen = pBuf->len;
  p->ip = 0;
  p->port = 0;
dengyihao's avatar
dengyihao 已提交
196
  p->shandle = pConn->pTransInst;  //
dengyihao's avatar
dengyihao 已提交
197 198 199
  p->thandle = pConn;
  p->chandle = NULL;

dengyihao's avatar
dengyihao 已提交
200
  STransMsgHead* pHead = (STransMsgHead*)p->msg;
dengyihao's avatar
dengyihao 已提交
201
  if (pHead->secured == 1) {
dengyihao's avatar
dengyihao 已提交
202 203
    STransUserMsg* uMsg = (p->msg + p->msgLen - sizeof(STransUserMsg));
    memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
dengyihao's avatar
dengyihao 已提交
204
    memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
dengyihao's avatar
dengyihao 已提交
205
  }
dengyihao's avatar
dengyihao 已提交
206 207

  pConn->inType = pHead->msgType;
dengyihao's avatar
dengyihao 已提交
208

U
ubuntu 已提交
209
  STrans* pRpc = (STrans*)p->shandle;
dengyihao's avatar
dengyihao 已提交
210 211
  pHead->code = htonl(pHead->code);

dengyihao's avatar
dengyihao 已提交
212 213 214
  int32_t dlen = 0;
  if (transDecompressMsg(NULL, 0, NULL)) {
    // add compress later
U
ubuntu 已提交
215
    // pHead = rpcDecompresSTransMsg(pHead);
dengyihao's avatar
dengyihao 已提交
216
  } else {
dengyihao's avatar
dengyihao 已提交
217
    pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
218
    // impl later
dengyihao's avatar
dengyihao 已提交
219
    //
dengyihao's avatar
dengyihao 已提交
220
  }
dengyihao's avatar
dengyihao 已提交
221

U
ubuntu 已提交
222
  STransMsg rpcMsg;
dengyihao's avatar
dengyihao 已提交
223
  rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
224 225 226
  rpcMsg.pCont = pHead->content;
  rpcMsg.msgType = pHead->msgType;
  rpcMsg.code = pHead->code;
dengyihao's avatar
dengyihao 已提交
227
  rpcMsg.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
228 229
  rpcMsg.handle = pConn;

dengyihao's avatar
dengyihao 已提交
230
  transClearBuffer(&pConn->readBuf);
dengyihao's avatar
dengyihao 已提交
231 232

  transRefSrvHandle(pConn);
dengyihao's avatar
fix bug  
dengyihao 已提交
233
  tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(rpcMsg.msgType),
dengyihao's avatar
dengyihao 已提交
234
         inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
dengyihao's avatar
fix bug  
dengyihao 已提交
235
         ntohs(pConn->locaddr.sin_port), rpcMsg.contLen);
dengyihao's avatar
dengyihao 已提交
236
  (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
dengyihao's avatar
dengyihao 已提交
237
  // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
dengyihao's avatar
dengyihao 已提交
238 239 240 241
  // auth
  // validate msg type
}

dengyihao's avatar
dengyihao 已提交
242
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
243
  // opt
dengyihao's avatar
dengyihao 已提交
244 245
  SSrvConn*    conn = cli->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
246 247
  if (nread > 0) {
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
248
    tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
dengyihao's avatar
dengyihao 已提交
249
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
250
      tTrace("server conn %p alread read complete packet", conn);
dengyihao's avatar
dengyihao 已提交
251
      uvHandleReq(conn);
dengyihao's avatar
dengyihao 已提交
252
    } else {
dengyihao's avatar
dengyihao 已提交
253
      tTrace("server %p read partial packet, continue to read", conn);
dengyihao's avatar
dengyihao 已提交
254 255 256
    }
    return;
  }
257 258 259
  if (nread == 0) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
260 261

  tError("server conn %p read error: %s", conn, uv_err_name(nread));
U
ubuntu 已提交
262
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
263 264 265 266 267 268 269 270
    conn->broken = true;
    transUnrefSrvHandle(conn);

    // if (conn->ref > 1) {
    //  conn->ref++;  // ref > 1 signed that write is in progress
    //}
    // tError("server conn %p read error: %s", conn, uv_err_name(nread));
    // destroyConn(conn, true);
dengyihao's avatar
dengyihao 已提交
271 272 273 274
  }
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->len = 2;
dengyihao's avatar
fix bug  
dengyihao 已提交
275
  buf->base = calloc(1, sizeof(char) * buf->len);
dengyihao's avatar
dengyihao 已提交
276 277 278 279
}

void uvOnTimeoutCb(uv_timer_t* handle) {
  // opt
dengyihao's avatar
dengyihao 已提交
280
  SSrvConn* pConn = handle->data;
dengyihao's avatar
dengyihao 已提交
281
  tError("server conn %p time out", pConn);
dengyihao's avatar
dengyihao 已提交
282 283
}

dengyihao's avatar
dengyihao 已提交
284
void uvOnSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
285 286
  SSrvConn* conn = req->data;
  transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
287
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
288
    tTrace("server conn %p data already was written on stream", conn);
dengyihao's avatar
add UT  
dengyihao 已提交
289 290 291 292 293 294 295 296 297 298 299
    if (conn->srvMsgs != NULL) {
      assert(taosArrayGetSize(conn->srvMsgs) >= 1);
      SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
      taosArrayRemove(conn->srvMsgs, 0);
      destroySmsg(msg);

      // send second data, just use for push
      if (taosArrayGetSize(conn->srvMsgs) > 0) {
        msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
        uvStartSendRespInternal(msg);
      }
dengyihao's avatar
dengyihao 已提交
300
    }
dengyihao's avatar
dengyihao 已提交
301
  } else {
dengyihao's avatar
dengyihao 已提交
302
    tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
303 304
    conn->broken = false;
    transUnrefSrvHandle(conn);
dengyihao's avatar
dengyihao 已提交
305 306
  }
}
dengyihao's avatar
dengyihao 已提交
307 308
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
309
    tTrace("success to dispatch conn to work thread");
dengyihao's avatar
dengyihao 已提交
310 311 312
  } else {
    tError("fail to dispatch conn to work thread");
  }
dengyihao's avatar
dengyihao 已提交
313
  free(req);
dengyihao's avatar
dengyihao 已提交
314
}
dengyihao's avatar
dengyihao 已提交
315

dengyihao's avatar
dengyihao 已提交
316
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
317
  // impl later;
dengyihao's avatar
dengyihao 已提交
318
  tTrace("server conn %p prepare to send resp", smsg->pConn);
dengyihao's avatar
dengyihao 已提交
319

dengyihao's avatar
formate  
dengyihao 已提交
320 321
  SSrvConn*  pConn = smsg->pConn;
  STransMsg* pMsg = &smsg->msg;
dengyihao's avatar
dengyihao 已提交
322 323 324 325 326
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
327 328

  pHead->secured = pMsg->code == 0 ? 1 : 0;  //
dengyihao's avatar
dengyihao 已提交
329
  pHead->msgType = smsg->pConn->inType + 1;
dengyihao's avatar
dengyihao 已提交
330
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
331 332 333 334 335 336
  // add more info
  char*   msg = (char*)pHead;
  int32_t len = transMsgLenFromCont(pMsg->contLen);
  if (transCompressMsg(msg, len, NULL)) {
    // impl later
  }
dengyihao's avatar
dengyihao 已提交
337 338 339
  tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
         inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
         ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
340

dengyihao's avatar
dengyihao 已提交
341 342 343 344
  pHead->msgLen = htonl(len);
  wb->base = msg;
  wb->len = len;
}
dengyihao's avatar
dengyihao 已提交
345 346

static void uvStartSendRespInternal(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
347 348 349 350
  uv_buf_t wb;
  uvPrepareSendData(smsg, &wb);

  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
351
  uv_timer_stop(&pConn->pTimer);
dengyihao's avatar
dengyihao 已提交
352
  uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
dengyihao's avatar
dengyihao 已提交
353 354 355 356
}
static void uvStartSendResp(SSrvMsg* smsg) {
  // impl
  SSrvConn* pConn = smsg->pConn;
dengyihao's avatar
dengyihao 已提交
357 358 359 360 361 362 363

  if (pConn->broken == true) {
    transUnrefSrvHandle(pConn);
    return;
  }
  transUnrefSrvHandle(pConn);

dengyihao's avatar
dengyihao 已提交
364
  if (taosArrayGetSize(pConn->srvMsgs) > 0) {
dengyihao's avatar
dengyihao 已提交
365 366
    tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr),
           ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
367 368 369 370 371
    taosArrayPush(pConn->srvMsgs, &smsg);
    return;
  }
  taosArrayPush(pConn->srvMsgs, &smsg);
  uvStartSendRespInternal(smsg);
dengyihao's avatar
dengyihao 已提交
372 373
  return;
}
dengyihao's avatar
dengyihao 已提交
374
static void destroySmsg(SSrvMsg* smsg) {
dengyihao's avatar
dengyihao 已提交
375 376 377 378
  if (smsg == NULL) {
    return;
  }
  transFreeMsg(smsg->msg.pCont);
dengyihao's avatar
dengyihao 已提交
379
  free(smsg);
dengyihao's avatar
dengyihao 已提交
380
}
dengyihao's avatar
fix bug  
dengyihao 已提交
381 382 383 384 385 386 387
static void destroyAllConn(SWorkThrdObj* pThrd) {
  while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
    queue* h = QUEUE_HEAD(&pThrd->conn);
    QUEUE_REMOVE(h);
    QUEUE_INIT(h);

    SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
dengyihao's avatar
dengyihao 已提交
388
    transUnrefSrvHandle(c);
dengyihao's avatar
fix bug  
dengyihao 已提交
389 390
  }
}
dengyihao's avatar
dengyihao 已提交
391
void uvWorkerAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
392 393
  SAsyncItem*   item = handle->data;
  SWorkThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
394
  SSrvConn*     conn = NULL;
dengyihao's avatar
dengyihao 已提交
395
  queue         wq;
dengyihao's avatar
dengyihao 已提交
396

dengyihao's avatar
dengyihao 已提交
397
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
dengyihao 已提交
398 399 400
  pthread_mutex_lock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  pthread_mutex_unlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
401 402 403 404

  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* head = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(head);
dengyihao's avatar
dengyihao 已提交
405 406 407

    SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
    if (msg == NULL) {
dengyihao's avatar
dengyihao 已提交
408
      tError("unexcept occurred, continue");
dengyihao's avatar
dengyihao 已提交
409
      continue;
dengyihao's avatar
dengyihao 已提交
410
    }
dengyihao's avatar
dengyihao 已提交
411 412
    if (msg->pConn == NULL) {
      free(msg);
dengyihao's avatar
dengyihao 已提交
413 414 415 416 417 418
      bool noConn = QUEUE_IS_EMPTY(&pThrd->conn);
      if (noConn == true) {
        uv_loop_close(pThrd->loop);
        uv_stop(pThrd->loop);
      } else {
        destroyAllConn(pThrd);
dengyihao's avatar
dengyihao 已提交
419 420
        // uv_loop_close(pThrd->loop);
        pThrd->quit = true;
dengyihao's avatar
dengyihao 已提交
421
      }
dengyihao's avatar
dengyihao 已提交
422 423 424
    } else {
      uvStartSendResp(msg);
    }
dengyihao's avatar
dengyihao 已提交
425 426
  }
}
dengyihao's avatar
dengyihao 已提交
427 428
static void uvAcceptAsyncCb(uv_async_t* async) {
  SServerObj* srv = async->data;
dengyihao's avatar
dengyihao 已提交
429
  tDebug("close server port %d", srv->port);
dengyihao's avatar
fix bug  
dengyihao 已提交
430
  uv_close((uv_handle_t*)&srv->server, NULL);
dengyihao's avatar
dengyihao 已提交
431 432
  uv_stop(srv->loop);
}
dengyihao's avatar
dengyihao 已提交
433

dengyihao's avatar
dengyihao 已提交
434
static void uvShutDownCb(uv_shutdown_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
435 436 437
  if (status != 0) {
    tDebug("conn failed to shut down: %s", uv_err_name(status));
  }
dengyihao's avatar
dengyihao 已提交
438 439 440 441
  uv_close((uv_handle_t*)req->handle, uvDestroyConn);
  free(req);
}

dengyihao's avatar
dengyihao 已提交
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
void uvOnAcceptCb(uv_stream_t* stream, int status) {
  if (status == -1) {
    return;
  }
  SServerObj* pObj = container_of(stream, SServerObj, server);

  uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pObj->loop, cli);

  if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
    uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));

    uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));

    pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
dengyihao's avatar
dengyihao 已提交
457

dengyihao's avatar
dengyihao 已提交
458
    tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
dengyihao's avatar
dengyihao 已提交
459
    uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
dengyihao's avatar
dengyihao 已提交
460 461
  } else {
    uv_close((uv_handle_t*)cli, NULL);
462
    free(cli);
dengyihao's avatar
dengyihao 已提交
463 464 465
  }
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
466
  tTrace("server connection coming");
dengyihao's avatar
dengyihao 已提交
467 468 469 470 471
  if (nread < 0) {
    if (nread != UV_EOF) {
      tError("read error %s", uv_err_name(nread));
    }
    // TODO(log other failure reason)
472
    // uv_close((uv_handle_t*)q, NULL);
dengyihao's avatar
dengyihao 已提交
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490
    return;
  }
  // free memory allocated by
  assert(nread == strlen(notify));
  assert(buf->base[0] == notify[0]);
  free(buf->base);

  SWorkThrdObj* pThrd = q->data;

  uv_pipe_t* pipe = (uv_pipe_t*)q;
  if (!uv_pipe_pending_count(pipe)) {
    tError("No pending count");
    return;
  }

  uv_handle_type pending = uv_pipe_pending_type(pipe);
  assert(pending == UV_TCP);

dengyihao's avatar
fix bug  
dengyihao 已提交
491
  SSrvConn* pConn = createConn(pThrd);
492

dengyihao's avatar
dengyihao 已提交
493
  pConn->pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
494
  /* init conn timer*/
dengyihao's avatar
dengyihao 已提交
495 496
  uv_timer_init(pThrd->loop, &pConn->pTimer);
  pConn->pTimer.data = pConn;
dengyihao's avatar
dengyihao 已提交
497 498 499 500 501 502 503 504

  pConn->hostThrd = pThrd;

  // init client handle
  pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  uv_tcp_init(pThrd->loop, pConn->pTcp);
  pConn->pTcp->data = pConn;

dengyihao's avatar
dengyihao 已提交
505
  pConn->pWriter.data = pConn;
dengyihao's avatar
dengyihao 已提交
506

dengyihao's avatar
dengyihao 已提交
507 508
  transSetConnOption((uv_tcp_t*)pConn->pTcp);

dengyihao's avatar
dengyihao 已提交
509 510 511
  if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
    uv_os_fd_t fd;
    uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
dengyihao's avatar
dengyihao 已提交
512
    tTrace("server conn %p created, fd: %d", pConn, fd);
dengyihao's avatar
dengyihao 已提交
513

dengyihao's avatar
dengyihao 已提交
514 515
    int addrlen = sizeof(pConn->addr);
    if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
dengyihao's avatar
dengyihao 已提交
516
      tError("server conn %p failed to get peer info", pConn);
dengyihao's avatar
dengyihao 已提交
517
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
518
      return;
dengyihao's avatar
dengyihao 已提交
519
    }
dengyihao's avatar
dengyihao 已提交
520 521 522 523

    addrlen = sizeof(pConn->locaddr);
    if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
      tError("server conn %p failed to get local info", pConn);
dengyihao's avatar
dengyihao 已提交
524
      transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
525 526 527
      return;
    }

dengyihao's avatar
dengyihao 已提交
528
    uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnRecvCb);
dengyihao's avatar
dengyihao 已提交
529

dengyihao's avatar
dengyihao 已提交
530
  } else {
dengyihao's avatar
dengyihao 已提交
531
    tDebug("failed to create new connection");
dengyihao's avatar
dengyihao 已提交
532
    transUnrefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
533 534 535 536 537
  }
}

void* acceptThread(void* arg) {
  // opt
dengyihao's avatar
dengyihao 已提交
538
  setThreadName("trans-accept");
dengyihao's avatar
dengyihao 已提交
539 540 541
  SServerObj* srv = (SServerObj*)arg;
  uv_run(srv->loop, UV_RUN_DEFAULT);
}
dengyihao's avatar
dengyihao 已提交
542 543
static bool addHandleToWorkloop(void* arg) {
  SWorkThrdObj* pThrd = arg;
dengyihao's avatar
dengyihao 已提交
544
  pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
545 546 547
  if (0 != uv_loop_init(pThrd->loop)) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
548

U
ubuntu 已提交
549
  // STrans* pRpc = pThrd->shandle;
dengyihao's avatar
dengyihao 已提交
550
  uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
dengyihao's avatar
dengyihao 已提交
551 552 553 554
  uv_pipe_open(pThrd->pipe, pThrd->fd);

  pThrd->pipe->data = pThrd;

dengyihao's avatar
dengyihao 已提交
555 556
  QUEUE_INIT(&pThrd->msg);
  pthread_mutex_init(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
557

dengyihao's avatar
fix bug  
dengyihao 已提交
558 559 560
  // conn set
  QUEUE_INIT(&pThrd->conn);

dengyihao's avatar
dengyihao 已提交
561
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb);
dengyihao's avatar
dengyihao 已提交
562
  uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
dengyihao's avatar
dengyihao 已提交
563 564 565 566 567 568 569 570 571 572 573 574 575
  return true;
}

static bool addHandleToAcceptloop(void* arg) {
  // impl later
  SServerObj* srv = arg;

  int err = 0;
  if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
    tError("failed to init accept server: %s", uv_err_name(err));
    return false;
  }

dengyihao's avatar
dengyihao 已提交
576 577 578 579
  // register an async here to quit server gracefully
  srv->pAcceptAsync = calloc(1, sizeof(uv_async_t));
  uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
  srv->pAcceptAsync->data = srv;
dengyihao's avatar
dengyihao 已提交
580

dengyihao's avatar
dengyihao 已提交
581
  struct sockaddr_in bind_addr;
dengyihao's avatar
dengyihao 已提交
582 583 584 585 586
  uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
  if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
    tError("failed to bind: %s", uv_err_name(err));
    return false;
  }
dengyihao's avatar
dengyihao 已提交
587
  if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
dengyihao's avatar
dengyihao 已提交
588 589 590 591
    tError("failed to listen: %s", uv_err_name(err));
    return false;
  }
  return true;
dengyihao's avatar
dengyihao 已提交
592 593
}
void* workerThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
594
  setThreadName("trans-worker");
dengyihao's avatar
dengyihao 已提交
595
  SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
dengyihao's avatar
dengyihao 已提交
596 597 598
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
}

dengyihao's avatar
fix bug  
dengyihao 已提交
599 600 601
static SSrvConn* createConn(void* hThrd) {
  SWorkThrdObj* pThrd = hThrd;

dengyihao's avatar
dengyihao 已提交
602
  SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
dengyihao's avatar
fix bug  
dengyihao 已提交
603 604 605
  QUEUE_INIT(&pConn->queue);

  QUEUE_PUSH(&pThrd->conn, &pConn->queue);
dengyihao's avatar
dengyihao 已提交
606
  pConn->srvMsgs = taosArrayInit(2, sizeof(void*));  //
dengyihao's avatar
dengyihao 已提交
607
  tTrace("conn %p created", pConn);
dengyihao's avatar
dengyihao 已提交
608 609

  pConn->broken = false;
dengyihao's avatar
dengyihao 已提交
610

dengyihao's avatar
dengyihao 已提交
611
  transRefSrvHandle(pConn);
dengyihao's avatar
dengyihao 已提交
612 613
  return pConn;
}
dengyihao's avatar
dengyihao 已提交
614

dengyihao's avatar
dengyihao 已提交
615
static void destroyConn(SSrvConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
616 617 618
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
619
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
620 621 622 623 624

  for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) {
    SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i);
    destroySmsg(msg);
  }
dengyihao's avatar
add UT  
dengyihao 已提交
625
  conn->srvMsgs = taosArrayDestroy(conn->srvMsgs);
626
  if (clear) {
dengyihao's avatar
dengyihao 已提交
627
    tTrace("try to destroy conn %p", conn);
dengyihao's avatar
dengyihao 已提交
628 629
    uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
    uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
630
  }
dengyihao's avatar
dengyihao 已提交
631 632
}
static void uvDestroyConn(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
633 634 635 636
  SSrvConn* conn = handle->data;
  if (conn == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
637 638
  SWorkThrdObj* thrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
639
  tDebug("server conn %p destroy", conn);
dengyihao's avatar
dengyihao 已提交
640
  uv_timer_stop(&conn->pTimer);
dengyihao's avatar
dengyihao 已提交
641
  QUEUE_REMOVE(&conn->queue);
dengyihao's avatar
dengyihao 已提交
642
  free(conn->pTcp);
dengyihao's avatar
dengyihao 已提交
643
  free(conn);
dengyihao's avatar
dengyihao 已提交
644

dengyihao's avatar
dengyihao 已提交
645 646
  if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
    uv_loop_close(thrd->loop);
dengyihao's avatar
dengyihao 已提交
647 648
    uv_stop(thrd->loop);
  }
dengyihao's avatar
dengyihao 已提交
649
}
dengyihao's avatar
dengyihao 已提交
650
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
dengyihao's avatar
dengyihao 已提交
651
  STransMsgHead* pHead = (STransMsgHead*)msg;
dengyihao's avatar
dengyihao 已提交
652 653 654 655

  if (pConn->spi && pConn->secured == 0) {
    // add auth part
    pHead->spi = pConn->spi;
dengyihao's avatar
dengyihao 已提交
656
    STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen);
dengyihao's avatar
dengyihao 已提交
657 658 659
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
dengyihao's avatar
dengyihao 已提交
660 661
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
    // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
dengyihao's avatar
dengyihao 已提交
662 663 664 665 666 667 668 669
  } else {
    pHead->spi = 0;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
  }

  return msgLen;
}

U
ubuntu 已提交
670
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
dengyihao's avatar
dengyihao 已提交
671 672 673 674 675 676 677 678 679 680 681 682
  SServerObj* srv = calloc(1, sizeof(SServerObj));
  srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
  srv->numOfThreads = numOfThreads;
  srv->workerIdx = 0;
  srv->pThreadObj = (SWorkThrdObj**)calloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
  srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*));
  srv->ip = ip;
  srv->port = port;
  uv_loop_init(srv->loop);

  for (int i = 0; i < srv->numOfThreads; i++) {
    SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
dengyihao's avatar
dengyihao 已提交
683
    thrd->quit = false;
dengyihao's avatar
dengyihao 已提交
684
    srv->pThreadObj[i] = thrd;
dengyihao's avatar
dengyihao 已提交
685

dengyihao's avatar
dengyihao 已提交
686 687 688
    srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
    int fds[2];
    if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
dengyihao's avatar
dengyihao 已提交
689
      goto End;
dengyihao's avatar
dengyihao 已提交
690 691 692 693
    }
    uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
    uv_pipe_open(&(srv->pipe[i][0]), fds[1]);  // init write

dengyihao's avatar
dengyihao 已提交
694
    thrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
695 696
    thrd->fd = fds[0];
    thrd->pipe = &(srv->pipe[i][1]);  // init read
dengyihao's avatar
dengyihao 已提交
697

dengyihao's avatar
dengyihao 已提交
698 699 700
    if (false == addHandleToWorkloop(thrd)) {
      goto End;
    }
dengyihao's avatar
dengyihao 已提交
701 702 703 704 705 706 707 708 709
    int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
    if (err == 0) {
      tDebug("sucess to create worker-thread %d", i);
      // printf("thread %d create\n", i);
    } else {
      // TODO: clear all other resource later
      tError("failed to create worker-thread %d", i);
    }
  }
dengyihao's avatar
dengyihao 已提交
710 711 712
  if (false == addHandleToAcceptloop(srv)) {
    goto End;
  }
dengyihao's avatar
dengyihao 已提交
713 714 715 716 717 718 719 720
  int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
  if (err == 0) {
    tDebug("success to create accept-thread");
  } else {
    // clear all resource later
  }

  return srv;
dengyihao's avatar
dengyihao 已提交
721
End:
U
ubuntu 已提交
722
  transCloseServer(srv);
dengyihao's avatar
dengyihao 已提交
723 724 725 726 727 728 729 730 731
  return NULL;
}

void destroyWorkThrd(SWorkThrdObj* pThrd) {
  if (pThrd == NULL) {
    return;
  }
  pthread_join(pThrd->thread, NULL);
  free(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
732
  transDestroyAsyncPool(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
733
  free(pThrd);
dengyihao's avatar
dengyihao 已提交
734
}
dengyihao's avatar
dengyihao 已提交
735 736 737 738
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  tDebug("send quit msg to work thread");

dengyihao's avatar
dengyihao 已提交
739
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
740 741
}

U
ubuntu 已提交
742
void transCloseServer(void* arg) {
dengyihao's avatar
dengyihao 已提交
743 744
  // impl later
  SServerObj* srv = arg;
dengyihao's avatar
dengyihao 已提交
745
  for (int i = 0; i < srv->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
746
    sendQuitToWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
747
    destroyWorkThrd(srv->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
748
  }
dengyihao's avatar
dengyihao 已提交
749 750 751 752 753 754 755

  tDebug("send quit msg to accept thread");
  uv_async_send(srv->pAcceptAsync);
  pthread_join(srv->thread, NULL);

  free(srv->pThreadObj);
  free(srv->pAcceptAsync);
dengyihao's avatar
dengyihao 已提交
756
  free(srv->loop);
dengyihao's avatar
dengyihao 已提交
757 758 759 760

  for (int i = 0; i < srv->numOfThreads; i++) {
    free(srv->pipe[i]);
  }
dengyihao's avatar
dengyihao 已提交
761
  free(srv->pipe);
dengyihao's avatar
dengyihao 已提交
762

dengyihao's avatar
dengyihao 已提交
763
  free(srv);
dengyihao's avatar
dengyihao 已提交
764
}
dengyihao's avatar
dengyihao 已提交
765

dengyihao's avatar
dengyihao 已提交
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780
void transRefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  SSrvConn* conn = handle;

  int ref = T_REF_INC((SSrvConn*)handle);
  UNUSED(ref);
}

void transUnrefSrvHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SSrvConn*)handle);
dengyihao's avatar
dengyihao 已提交
781
  tDebug("handle %p ref count: %d", handle, ref);
dengyihao's avatar
dengyihao 已提交
782 783 784 785 786 787

  if (ref == 0) {
    destroyConn((SSrvConn*)handle, true);
  }
  // unref srv handle
}
U
ubuntu 已提交
788
void transSendResponse(const STransMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
789 790 791
  if (pMsg->handle == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
792
  SSrvConn*     pConn = pMsg->handle;
dengyihao's avatar
dengyihao 已提交
793 794
  SWorkThrdObj* pThrd = pConn->hostThrd;

dengyihao's avatar
dengyihao 已提交
795 796 797
  SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
  srvMsg->pConn = pConn;
  srvMsg->msg = *pMsg;
dengyihao's avatar
dengyihao 已提交
798
  tTrace("server conn %p start to send resp", pConn);
dengyihao's avatar
dengyihao 已提交
799
  transSendAsync(pThrd->asyncPool, &srvMsg->q);
dengyihao's avatar
dengyihao 已提交
800
}
dengyihao's avatar
formate  
dengyihao 已提交
801 802
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
  SSrvConn*          pConn = thandle;
dengyihao's avatar
dengyihao 已提交
803
  struct sockaddr_in addr = pConn->addr;
U
ubuntu 已提交
804

dengyihao's avatar
dengyihao 已提交
805 806
  pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
  pInfo->clientPort = ntohs(addr.sin_port);
dengyihao's avatar
dengyihao 已提交
807 808 809 810
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
  return 0;
}

dengyihao's avatar
dengyihao 已提交
811
#endif