transCli.c 47.6 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
dengyihao's avatar
dengyihao 已提交
2

dengyihao's avatar
dengyihao 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV
#include "transComm.h"

typedef struct SCliConn {
dengyihao's avatar
dengyihao 已提交
20
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
21 22
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
23
  queue        wreqQueue;
dengyihao's avatar
dengyihao 已提交
24

dengyihao's avatar
dengyihao 已提交
25 26 27 28
  void* hostThrd;

  SConnBuffer readBuf;
  STransQueue cliMsgs;
dengyihao's avatar
dengyihao 已提交
29
  queue       q;
dengyihao's avatar
dengyihao 已提交
30 31

  STransCtx  ctx;
dengyihao's avatar
dengyihao 已提交
32 33
  bool       broken;  // link broken or not
  ConnStatus status;  //
dengyihao's avatar
dengyihao 已提交
34

dengyihao's avatar
dengyihao 已提交
35
  int64_t  refId;
dengyihao's avatar
dengyihao 已提交
36 37 38
  char*    ip;
  uint32_t port;

dengyihao's avatar
dengyihao 已提交
39
  SDelayTask* task;
dengyihao's avatar
dengyihao 已提交
40
  // debug and log info
dengyihao's avatar
dengyihao 已提交
41 42
  struct sockaddr addr;
  struct sockaddr localAddr;
dengyihao's avatar
dengyihao 已提交
43
} SCliConn;
dengyihao's avatar
dengyihao 已提交
44

dengyihao's avatar
dengyihao 已提交
45
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
46
  STransConnCtx* ctx;
dengyihao's avatar
formate  
dengyihao 已提交
47
  STransMsg      msg;
dengyihao's avatar
dengyihao 已提交
48
  queue          q;
dengyihao's avatar
dengyihao 已提交
49
  STransMsgType  type;
dengyihao's avatar
dengyihao 已提交
50

dengyihao's avatar
dengyihao 已提交
51
  int64_t  refId;
dengyihao's avatar
dengyihao 已提交
52 53
  uint64_t st;
  int      sent;  //(0: no send, 1: alread sent)
dengyihao's avatar
dengyihao 已提交
54 55
} SCliMsg;

dengyihao's avatar
dengyihao 已提交
56
typedef struct SCliThrd {
dengyihao's avatar
dengyihao 已提交
57 58
  TdThread    thread;  // tid
  int64_t     pid;     // pid
dengyihao's avatar
dengyihao 已提交
59
  uv_loop_t*  loop;
dengyihao's avatar
dengyihao 已提交
60
  SAsyncPool* asyncPool;
dengyihao's avatar
dengyihao 已提交
61
  uv_timer_t  timer;
dengyihao's avatar
dengyihao 已提交
62 63 64
  void*       pool;  // conn pool

  // msg queue
dengyihao's avatar
dengyihao 已提交
65
  queue         msg;
wafwerar's avatar
wafwerar 已提交
66
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
67
  SDelayQueue*  delayQueue;
dengyihao's avatar
dengyihao 已提交
68
  SDelayQueue*  timeoutQueue;
dengyihao's avatar
dengyihao 已提交
69 70
  uint64_t      nextTimeout;  // next timeout
  void*         pTransInst;   //
dengyihao's avatar
dengyihao 已提交
71

dengyihao's avatar
dengyihao 已提交
72
  SCvtAddr cvtAddr;
dengyihao's avatar
dengyihao 已提交
73

dengyihao's avatar
dengyihao 已提交
74 75
  SCliMsg* stopMsg;

dengyihao's avatar
dengyihao 已提交
76
  bool quit;
dengyihao's avatar
dengyihao 已提交
77
} SCliThrd;
dengyihao's avatar
dengyihao 已提交
78

U
ubuntu 已提交
79
typedef struct SCliObj {
dengyihao's avatar
dengyihao 已提交
80 81 82 83
  char       label[TSDB_LABEL_LEN];
  int32_t    index;
  int        numOfThreads;
  SCliThrd** pThreadObj;
U
ubuntu 已提交
84
} SCliObj;
dengyihao's avatar
dengyihao 已提交
85

dengyihao's avatar
dengyihao 已提交
86 87 88 89
typedef struct SConnList {
  queue conn;
} SConnList;

dengyihao's avatar
dengyihao 已提交
90
// conn pool
dengyihao's avatar
dengyihao 已提交
91
// add expire timeout and capacity limit
dengyihao's avatar
dengyihao 已提交
92
static void*     createConnPool(int size);
93
static void*     destroyConnPool(void* pool);
dengyihao's avatar
dengyihao 已提交
94
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
dengyihao's avatar
dengyihao 已提交
95
static void      addConnToPool(void* pool, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
96
static void      doCloseIdleConn(void* param);
dengyihao's avatar
dengyihao 已提交
97

dengyihao's avatar
dengyihao 已提交
98
// register timer in each thread to clear expire conn
dengyihao's avatar
dengyihao 已提交
99
// static void cliTimeoutCb(uv_timer_t* handle);
U
ubuntu 已提交
100
// alloc buf for recv
dengyihao's avatar
dengyihao 已提交
101
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
U
ubuntu 已提交
102 103
// callback after  read nbytes from socket
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
104
// callback after write data to socket
U
ubuntu 已提交
105
static void cliSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
106
// callback after conn  to server
U
ubuntu 已提交
107 108
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
109

dengyihao's avatar
dengyihao 已提交
110
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
dengyihao's avatar
dengyihao 已提交
111

dengyihao's avatar
dengyihao 已提交
112
static SCliConn* cliCreateConn(SCliThrd* thrd);
U
ubuntu 已提交
113 114
static void      cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void      cliDestroy(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
115
static void      cliSend(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
116

dengyihao's avatar
dengyihao 已提交
117 118 119 120 121 122 123
static bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
  if (code != 0) return false;
  if (pCtx->retryCnt == 0) return false;
  if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
  return true;
}

dengyihao's avatar
dengyihao 已提交
124
void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
dengyihao's avatar
dengyihao 已提交
125 126 127 128
/*
 * set TCP connection timeout per-socket level
 */
static int cliCreateSocket();
dengyihao's avatar
dengyihao 已提交
129
// process data read from server, add decompress etc later
U
ubuntu 已提交
130
static void cliHandleResp(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
131
// handle except about conn
U
ubuntu 已提交
132
static void cliHandleExcept(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
133

134
// handle req from app
dengyihao's avatar
dengyihao 已提交
135 136 137 138 139 140 141 142
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
                                                                   cliHandleUpdate};

static void cliSendQuit(SCliThrd* thrd);
U
ubuntu 已提交
143
static void destroyUserdata(STransMsg* userdata);
dengyihao's avatar
dengyihao 已提交
144

U
ubuntu 已提交
145
static int cliRBChoseIdx(STrans* pTransInst);
dengyihao's avatar
dengyihao 已提交
146

dengyihao's avatar
dengyihao 已提交
147
static void destroyCmsg(void* cmsg);
dengyihao's avatar
dengyihao 已提交
148
static void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
149
// thread obj
dengyihao's avatar
dengyihao 已提交
150 151
static SCliThrd* createThrdObj();
static void      destroyThrdObj(SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
152

dengyihao's avatar
dengyihao 已提交
153 154
static void cliWalkCb(uv_handle_t* handle, void* arg);

dengyihao's avatar
dengyihao 已提交
155 156 157 158 159 160 161 162 163 164 165 166
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
  SCliMsg* pMsg = NULL;
  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    pMsg = transQueueGet(&conn->cliMsgs, i);
    if (pMsg != NULL && pMsg->ctx != NULL) {
      if (conn->ctx.freeFunc != NULL) {
        conn->ctx.freeFunc(pMsg->ctx->ahandle);
      }
    }
    destroyCmsg(pMsg);
  }
}
dengyihao's avatar
dengyihao 已提交
167 168 169 170 171 172 173
#define CLI_RELEASE_UV(loop)        \
  do {                              \
    uv_walk(loop, cliWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);   \
    uv_loop_close(loop);            \
  } while (0);

dengyihao's avatar
dengyihao 已提交
174 175 176 177 178 179
// snprintf may cause performance problem
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port)          \
  do {                                                  \
    snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
  } while (0)

dengyihao's avatar
dengyihao 已提交
180 181 182 183 184
#define CONN_HOST_THREAD_IDX1(idx, exh, refId, pThrd) \
  do {                                                \
    if (exh == NULL) {                                \
      idx = -1;                                       \
    } else {                                          \
dengyihao's avatar
dengyihao 已提交
185
      ASYNC_CHECK_HANDLE((exh), refId);               \
dengyihao's avatar
dengyihao 已提交
186
      pThrd = (SCliThrd*)(exh)->pThrd;                \
dengyihao's avatar
dengyihao 已提交
187 188
    }                                                 \
  } while (0)
dengyihao's avatar
dengyihao 已提交
189
#define CONN_PERSIST_TIME(para)    ((para) == 0 ? 3 * 1000 : (para))
dengyihao's avatar
dengyihao 已提交
190
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
dengyihao's avatar
dengyihao 已提交
191
#define CONN_GET_INST_LABEL(conn)  (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
S
Shengliang Guan 已提交
192 193 194 195 196 197 198 199 200 201 202 203 204
#define CONN_SHOULD_RELEASE(conn, head)                                                                           \
  do {                                                                                                            \
    if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) {                                                \
      uint64_t ahandle = head->ahandle;                                                                           \
      CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);                                                                  \
      transClearBuffer(&conn->readBuf);                                                                           \
      transFreeMsg(transContFromHead((char*)head));                                                               \
      tDebug("%s conn %p receive release request, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); \
      if (T_REF_VAL_GET(conn) > 1) {                                                                              \
        transUnrefCliHandle(conn);                                                                                \
      }                                                                                                           \
      destroyCmsg(pMsg);                                                                                          \
      cliReleaseUnfinishedMsg(conn);                                                                              \
dengyihao's avatar
dengyihao 已提交
205
      transQueueClear(&conn->cliMsgs);                                                                            \
S
Shengliang Guan 已提交
206 207 208
      addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);                                                     \
      return;                                                                                                     \
    }                                                                                                             \
dengyihao's avatar
dengyihao 已提交
209 210
  } while (0)

dengyihao's avatar
dengyihao 已提交
211 212 213 214 215
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle)                                         \
  do {                                                                                    \
    int i = 0, sz = transQueueSize(&conn->cliMsgs);                                       \
    for (; i < sz; i++) {                                                                 \
      pMsg = transQueueGet(&conn->cliMsgs, i);                                            \
S
Shengliang 已提交
216
      if (pMsg != NULL && pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
dengyihao's avatar
dengyihao 已提交
217 218 219 220 221 222 223 224
        break;                                                                            \
      }                                                                                   \
    }                                                                                     \
    if (i == sz) {                                                                        \
      pMsg = NULL;                                                                        \
    } else {                                                                              \
      pMsg = transQueueRm(&conn->cliMsgs, i);                                             \
    }                                                                                     \
dengyihao's avatar
dengyihao 已提交
225
  } while (0)
U
ubuntu 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238
#define CONN_GET_NEXT_SENDMSG(conn)                 \
  do {                                              \
    int i = 0;                                      \
    do {                                            \
      pCliMsg = transQueueGet(&conn->cliMsgs, i++); \
      if (pCliMsg && 0 == pCliMsg->sent) {          \
        break;                                      \
      }                                             \
    } while (pCliMsg != NULL);                      \
    if (pCliMsg == NULL) {                          \
      goto _RETURN;                                 \
    }                                               \
  } while (0)
dengyihao's avatar
dengyihao 已提交
239

dengyihao's avatar
dengyihao 已提交
240 241 242 243 244
#define CONN_HANDLE_THREAD_QUIT(thrd) \
  do {                                \
    if (thrd->quit) {                 \
      return;                         \
    }                                 \
dengyihao's avatar
dengyihao 已提交
245 246 247 248 249
  } while (0)

#define CONN_HANDLE_BROKEN(conn) \
  do {                           \
    if (conn->broken) {          \
dengyihao's avatar
formate  
dengyihao 已提交
250
      cliHandleExcept(conn);     \
dengyihao's avatar
dengyihao 已提交
251
      return;                    \
dengyihao's avatar
dengyihao 已提交
252
    }                            \
dengyihao's avatar
dengyihao 已提交
253
  } while (0)
dengyihao's avatar
dengyihao 已提交
254

dengyihao's avatar
formate  
dengyihao 已提交
255 256
#define CONN_SET_PERSIST_BY_APP(conn) \
  do {                                \
dengyihao's avatar
dengyihao 已提交
257 258
    if (conn->status == ConnNormal) { \
      conn->status = ConnAcquire;     \
dengyihao's avatar
formate  
dengyihao 已提交
259 260 261
      transRefCliHandle(conn);        \
    }                                 \
  } while (0)
262

dengyihao's avatar
dengyihao 已提交
263 264 265 266
#define CONN_NO_PERSIST_BY_APP(conn) \
  (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) \
  (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
267

S
Shengliang Guan 已提交
268 269
#define REQUEST_NO_RESP(msg)         ((msg)->info.noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg)   ((msg)->info.persistHandle == 1)
dengyihao's avatar
dengyihao 已提交
270
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
dengyihao's avatar
dengyihao 已提交
271

dengyihao's avatar
dengyihao 已提交
272
#define EPSET_IS_VALID(epSet)       ((epSet) != NULL && (epSet)->numOfEps != 0)
dengyihao's avatar
dengyihao 已提交
273
#define EPSET_GET_SIZE(epSet)       (epSet)->numOfEps
dengyihao's avatar
dengyihao 已提交
274
#define EPSET_GET_INUSE_IP(epSet)   ((epSet)->eps[(epSet)->inUse].fqdn)
dengyihao's avatar
dengyihao 已提交
275
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
dengyihao's avatar
dengyihao 已提交
276 277 278 279 280
#define EPSET_FORWARD_INUSE(epSet)                                 \
  do {                                                             \
    if ((epSet)->numOfEps != 0) {                                  \
      (epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \
    }                                                              \
dengyihao's avatar
dengyihao 已提交
281
  } while (0)
dengyihao's avatar
dengyihao 已提交
282

dengyihao's avatar
dengyihao 已提交
283 284 285 286 287 288 289 290 291 292 293
#define EPSET_DEBUG_STR(epSet, tbuf)                                                                                   \
  do {                                                                                                                 \
    int len = snprintf(tbuf, sizeof(tbuf), "epset:{");                                                                 \
    for (int i = 0; i < (epSet)->numOfEps; i++) {                                                                      \
      if (i == (epSet)->numOfEps - 1) {                                                                                \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port);   \
      } else {                                                                                                         \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
      }                                                                                                                \
    }                                                                                                                  \
    len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse);                                    \
dengyihao's avatar
dengyihao 已提交
294
  } while (0);
dengyihao's avatar
dengyihao 已提交
295

U
ubuntu 已提交
296
static void* cliWorkThread(void* arg);
dengyihao's avatar
dengyihao 已提交
297

dengyihao's avatar
dengyihao 已提交
298
bool cliMaySendCachedMsg(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
299
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
300
    SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
301
    CONN_GET_NEXT_SENDMSG(conn);
dengyihao's avatar
dengyihao 已提交
302 303
    cliSend(conn);
  }
dengyihao's avatar
dengyihao 已提交
304
  return false;
U
ubuntu 已提交
305 306
_RETURN:
  return false;
dengyihao's avatar
dengyihao 已提交
307
}
dengyihao's avatar
formate  
dengyihao 已提交
308
void cliHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
309 310
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
311

dengyihao's avatar
dengyihao 已提交
312 313 314
  STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
315

dengyihao's avatar
dengyihao 已提交
316 317 318 319 320
  STransMsg transMsg = {0};
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = transContFromHead((char*)pHead);
  transMsg.code = pHead->code;
  transMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
321
  transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
322
  transMsg.info.traceId = pHead->traceId;
dengyihao's avatar
dengyihao 已提交
323
  transMsg.info.hasEpSet = pHead->hasEpSet;
dengyihao's avatar
dengyihao 已提交
324

dengyihao's avatar
dengyihao 已提交
325 326
  SCliMsg*       pMsg = NULL;
  STransConnCtx* pCtx = NULL;
dengyihao's avatar
dengyihao 已提交
327
  CONN_SHOULD_RELEASE(conn, pHead);
dengyihao's avatar
dengyihao 已提交
328

dengyihao's avatar
dengyihao 已提交
329 330
  if (CONN_NO_PERSIST_BY_APP(conn)) {
    pMsg = transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
331 332 333

    pCtx = pMsg ? pMsg->ctx : NULL;
    transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
334
    tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
U
ubuntu 已提交
335
  } else {
dengyihao's avatar
dengyihao 已提交
336 337 338
    uint64_t ahandle = (uint64_t)pHead->ahandle;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
    if (pMsg == NULL) {
S
Shengliang Guan 已提交
339
      transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
340 341
      tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
             transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
342
      if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
D
dapan1121 已提交
343
        transMsg.code = TSDB_CODE_RPC_BROKEN_LINK;
S
Shengliang Guan 已提交
344
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
dengyihao's avatar
dengyihao 已提交
345 346
        tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
347 348 349
      }
    } else {
      pCtx = pMsg ? pMsg->ctx : NULL;
S
Shengliang Guan 已提交
350
      transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
351
      tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
352
    }
U
ubuntu 已提交
353
  }
dengyihao's avatar
dengyihao 已提交
354 355
  // buf's mem alread translated to transMsg.pCont
  transClearBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
356
  if (!CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
357
    transMsg.info.handle = (void*)conn->refId;
dengyihao's avatar
dengyihao 已提交
358
    tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
359
  }
dengyihao's avatar
dengyihao 已提交
360

dengyihao's avatar
dengyihao 已提交
361
  STraceId* trace = &transMsg.info.traceId;
dengyihao's avatar
dengyihao 已提交
362

S
Shengliang Guan 已提交
363
  tGTrace("%s conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, code:0x%x", CONN_GET_INST_LABEL(conn),
dengyihao's avatar
dengyihao 已提交
364 365 366 367
          conn, TMSG_INFO(pHead->msgType), taosInetNtoa(((struct sockaddr_in*)&conn->addr)->sin_addr),
          ntohs(((struct sockaddr_in*)&conn->addr)->sin_port),
          taosInetNtoa(((struct sockaddr_in*)&conn->localAddr)->sin_addr),
          ntohs(((struct sockaddr_in*)&conn->localAddr)->sin_port), transMsg.contLen, transMsg.code);
dengyihao's avatar
dengyihao 已提交
368

dengyihao's avatar
dengyihao 已提交
369
  if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
370
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
371 372
    return;
  }
S
Shengliang Guan 已提交
373
  if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
374
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
375 376
    return;
  }
dengyihao's avatar
dengyihao 已提交
377

dengyihao's avatar
dengyihao 已提交
378
  if (cliAppCb(conn, &transMsg, pMsg) != 0) {
dengyihao's avatar
dengyihao 已提交
379
    return;
dengyihao's avatar
dengyihao 已提交
380
  }
dengyihao's avatar
dengyihao 已提交
381 382
  destroyCmsg(pMsg);

dengyihao's avatar
dengyihao 已提交
383
  if (cliMaySendCachedMsg(conn) == true) {
dengyihao's avatar
dengyihao 已提交
384 385
    return;
  }
dengyihao's avatar
dengyihao 已提交
386

U
ubuntu 已提交
387
  if (CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
388
    addConnToPool(pThrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
389
  }
dengyihao's avatar
test  
dengyihao 已提交
390

dengyihao's avatar
dengyihao 已提交
391
  uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
392
}
U
ubuntu 已提交
393 394

void cliHandleExcept(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
395
  if (transQueueEmpty(&pConn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
396
    if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
dengyihao's avatar
dengyihao 已提交
397
      tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
formate  
dengyihao 已提交
398 399 400
      transUnrefCliHandle(pConn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
401
  }
dengyihao's avatar
dengyihao 已提交
402 403 404
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
  bool      once = false;
D
dapan1121 已提交
405
  do {
dengyihao's avatar
dengyihao 已提交
406
    SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
D
dapan1121 已提交
407 408
    if (pMsg == NULL && once) {
      break;
dengyihao's avatar
dengyihao 已提交
409
    }
dengyihao's avatar
dengyihao 已提交
410
    STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
U
ubuntu 已提交
411

dengyihao's avatar
dengyihao 已提交
412
    STransMsg transMsg = {0};
dengyihao's avatar
dengyihao 已提交
413
    transMsg.code = pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL;
dengyihao's avatar
dengyihao 已提交
414
    transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
S
Shengliang Guan 已提交
415
    transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
416

dengyihao's avatar
dengyihao 已提交
417
    if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
S
Shengliang Guan 已提交
418
      transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
419
      tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
dengyihao's avatar
dengyihao 已提交
420
             TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
421 422
      if (transMsg.info.ahandle == NULL) {
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
dengyihao's avatar
dengyihao 已提交
423
        tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
S
Shengliang Guan 已提交
424
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
425
      }
dengyihao's avatar
dengyihao 已提交
426
    } else {
S
Shengliang Guan 已提交
427
      transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
428 429 430
    }

    if (pCtx == NULL || pCtx->pSem == NULL) {
S
Shengliang Guan 已提交
431
      if (transMsg.info.ahandle == NULL) {
D
dapan1121 已提交
432
        once = true;
U
ubuntu 已提交
433 434
        continue;
      }
dengyihao's avatar
dengyihao 已提交
435
    }
dengyihao's avatar
dengyihao 已提交
436
    if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
dengyihao's avatar
dengyihao 已提交
437
      return;
dengyihao's avatar
dengyihao 已提交
438 439
    }
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
440
    tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn));
D
dapan1121 已提交
441
  } while (!transQueueEmpty(&pConn->cliMsgs));
dengyihao's avatar
dengyihao 已提交
442
  transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
443
}
dengyihao's avatar
dengyihao 已提交
444

dengyihao's avatar
dengyihao 已提交
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
// void cliTimeoutCb(uv_timer_t* handle) {
//   SCliThrd* pThrd = handle->data;
//   STrans*   pTransInst = pThrd->pTransInst;
//   int64_t   currentTime = pThrd->nextTimeout;
//   tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label);
//
//   SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
//   while (p != NULL) {
//     while (!QUEUE_IS_EMPTY(&p->conn)) {
//       queue*    h = QUEUE_HEAD(&p->conn);
//       SCliConn* c = QUEUE_DATA(h, SCliConn, q);
//       if (c->expireTime < currentTime) {
//         QUEUE_REMOVE(h);
//         transUnrefCliHandle(c);
//       } else {
//         break;
//       }
//     }
//     p = taosHashIterate((SHashObj*)pThrd->pool, p);
//   }
//
//   pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
//   uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0);
// }
U
ubuntu 已提交
469 470

void* createConnPool(int size) {
471 472
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
473
}
U
ubuntu 已提交
474
void* destroyConnPool(void* pool) {
dengyihao's avatar
dengyihao 已提交
475
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
476 477
  while (connList != NULL) {
    while (!QUEUE_IS_EMPTY(&connList->conn)) {
dengyihao's avatar
dengyihao 已提交
478
      queue*    h = QUEUE_HEAD(&connList->conn);
dengyihao's avatar
dengyihao 已提交
479
      SCliConn* c = QUEUE_DATA(h, SCliConn, q);
U
ubuntu 已提交
480
      cliDestroyConn(c, true);
dengyihao's avatar
dengyihao 已提交
481
    }
dengyihao's avatar
dengyihao 已提交
482
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
483
  }
dengyihao's avatar
dengyihao 已提交
484
  taosHashCleanup(pool);
485
  return NULL;
dengyihao's avatar
dengyihao 已提交
486 487
}

dengyihao's avatar
dengyihao 已提交
488
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
dengyihao's avatar
dengyihao 已提交
489
  char key[128] = {0};
dengyihao's avatar
dengyihao 已提交
490
  CONN_CONSTRUCT_HASH_KEY(key, ip, port);
dengyihao's avatar
dengyihao 已提交
491

dengyihao's avatar
dengyihao 已提交
492 493
  SHashObj*  pPool = pool;
  SConnList* plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
494 495
  if (plist == NULL) {
    SConnList list;
dengyihao's avatar
dengyihao 已提交
496 497
    taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
498
    QUEUE_INIT(&plist->conn);
dengyihao's avatar
dengyihao 已提交
499 500 501 502 503
  }

  if (QUEUE_IS_EMPTY(&plist->conn)) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
504
  queue*    h = QUEUE_HEAD(&plist->conn);
dengyihao's avatar
dengyihao 已提交
505
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
dengyihao's avatar
dengyihao 已提交
506
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
507 508 509
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);
  assert(h == &conn->q);
dengyihao's avatar
dengyihao 已提交
510 511 512 513

  transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
  conn->task = NULL;

dengyihao's avatar
dengyihao 已提交
514
  return conn;
dengyihao's avatar
dengyihao 已提交
515
}
dengyihao's avatar
dengyihao 已提交
516
static int32_t allocConnRef(SCliConn* conn, bool update) {
dengyihao's avatar
dengyihao 已提交
517
  if (update) {
dengyihao's avatar
dengyihao 已提交
518
    transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
519
    conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
520 521 522 523
  }
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
524
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
dengyihao's avatar
dengyihao 已提交
525
  conn->refId = exh->refId;
dengyihao's avatar
dengyihao 已提交
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
  return 0;
}

static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
  if (update) {
    transRemoveExHandle(transGetRefMgt(), conn->refId);
    conn->refId = -1;
  }
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
  if (exh == NULL) {
    return -1;
  }
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
  conn->refId = exh->refId;

  transReleaseExHandle(transGetRefMgt(), handle);
  return 0;
dengyihao's avatar
dengyihao 已提交
544
}
dengyihao's avatar
dengyihao 已提交
545

dengyihao's avatar
dengyihao 已提交
546
static void addConnToPool(void* pool, SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
547 548 549
  if (conn->status == ConnInPool) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
550
  SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
551 552
  CONN_HANDLE_THREAD_QUIT(thrd);

dengyihao's avatar
dengyihao 已提交
553 554
  allocConnRef(conn, true);

dengyihao's avatar
dengyihao 已提交
555
  STrans* pTransInst = thrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
556
  cliReleaseUnfinishedMsg(conn);
dengyihao's avatar
dengyihao 已提交
557
  transQueueClear(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
558
  transCtxCleanup(&conn->ctx);
dengyihao's avatar
dengyihao 已提交
559
  conn->status = ConnInPool;
dengyihao's avatar
dengyihao 已提交
560

dengyihao's avatar
dengyihao 已提交
561
  char key[128] = {0};
dengyihao's avatar
dengyihao 已提交
562
  CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
dengyihao's avatar
dengyihao 已提交
563
  tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
dengyihao's avatar
dengyihao 已提交
564

dengyihao's avatar
dengyihao 已提交
565
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
566 567
  // list already create before
  assert(plist != NULL);
dengyihao's avatar
dengyihao 已提交
568 569
  QUEUE_INIT(&conn->q);
  QUEUE_PUSH(&plist->conn, &conn->q);
dengyihao's avatar
dengyihao 已提交
570

dengyihao's avatar
dengyihao 已提交
571
  assert(!QUEUE_IS_EMPTY(&plist->conn));
dengyihao's avatar
dengyihao 已提交
572 573 574 575 576

  STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
  arg->param1 = conn;
  arg->param2 = thrd;
  conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
dengyihao's avatar
dengyihao 已提交
577
}
dengyihao's avatar
dengyihao 已提交
578
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
579
  SCliConn*    conn = handle->data;
dengyihao's avatar
dengyihao 已提交
580
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
581
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
582
}
U
ubuntu 已提交
583
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
584
  // impl later
dengyihao's avatar
dengyihao 已提交
585 586 587
  if (handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
588 589
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
590
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
591
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
592
    if (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
593
      tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
U
ubuntu 已提交
594
      cliHandleResp(conn);
dengyihao's avatar
dengyihao 已提交
595
    } else {
dengyihao's avatar
dengyihao 已提交
596
      tTrace("%s conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
597
    }
dengyihao's avatar
dengyihao 已提交
598 599
    return;
  }
dengyihao's avatar
dengyihao 已提交
600

601 602
  assert(nread <= 0);
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
603 604 605
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
dengyihao's avatar
dengyihao 已提交
606
    tTrace("%s conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
607 608
    return;
  }
dengyihao's avatar
fix bug  
dengyihao 已提交
609
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
610
    tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
611
    conn->broken = true;
U
ubuntu 已提交
612
    cliHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
613
  }
dengyihao's avatar
dengyihao 已提交
614
}
dengyihao's avatar
dengyihao 已提交
615

dengyihao's avatar
dengyihao 已提交
616
static SCliConn* cliCreateConn(SCliThrd* pThrd) {
wafwerar's avatar
wafwerar 已提交
617
  SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
618
  // read/write stream handle
wafwerar's avatar
wafwerar 已提交
619
  conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
620 621 622 623
  uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
  conn->stream->data = conn;

  conn->connReq.data = conn;
dengyihao's avatar
dengyihao 已提交
624

dengyihao's avatar
dengyihao 已提交
625 626
  transReqQueueInit(&conn->wreqQueue);

dengyihao's avatar
dengyihao 已提交
627
  transQueueInit(&conn->cliMsgs, NULL);
dengyihao's avatar
dengyihao 已提交
628
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
629
  conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
630 631
  conn->status = ConnNormal;
  conn->broken = 0;
dengyihao's avatar
dengyihao 已提交
632
  transRefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
633

dengyihao's avatar
dengyihao 已提交
634
  allocConnRef(conn, false);
dengyihao's avatar
dengyihao 已提交
635

dengyihao's avatar
dengyihao 已提交
636 637
  return conn;
}
U
ubuntu 已提交
638
static void cliDestroyConn(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
639
  tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
640 641
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
642
  transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
643 644
  conn->refId = -1;

dengyihao's avatar
dengyihao 已提交
645 646
  if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);

dengyihao's avatar
dengyihao 已提交
647
  if (clear) {
dengyihao's avatar
dengyihao 已提交
648
    if (!uv_is_closing((uv_handle_t*)conn->stream)) {
dengyihao's avatar
dengyihao 已提交
649
      uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
650 651
      uv_close((uv_handle_t*)conn->stream, cliDestroy);
    }
652
  }
dengyihao's avatar
dengyihao 已提交
653
}
U
ubuntu 已提交
654
static void cliDestroy(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
655 656 657 658
  if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
659
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
660
  transRemoveExHandle(transGetRefMgt(), conn->refId);
wafwerar's avatar
wafwerar 已提交
661
  taosMemoryFree(conn->ip);
dengyihao's avatar
dengyihao 已提交
662
  conn->stream->data = NULL;
wafwerar's avatar
wafwerar 已提交
663
  taosMemoryFree(conn->stream);
dengyihao's avatar
dengyihao 已提交
664
  transCtxCleanup(&conn->ctx);
dengyihao's avatar
dengyihao 已提交
665
  cliReleaseUnfinishedMsg(conn);
dengyihao's avatar
dengyihao 已提交
666
  transQueueDestroy(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
667
  tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
668 669
  transReqQueueClear(&conn->wreqQueue);

dengyihao's avatar
dengyihao 已提交
670
  transDestroyBuffer(&conn->readBuf);
wafwerar's avatar
wafwerar 已提交
671
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
672
}
dengyihao's avatar
dengyihao 已提交
673
static bool cliHandleNoResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
674 675
  bool res = false;
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
676
    SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0);
dengyihao's avatar
dengyihao 已提交
677
    if (REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
678
      transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
679 680 681 682 683
      destroyCmsg(pMsg);
      res = true;
    }
    if (res == true) {
      if (cliMaySendCachedMsg(conn) == false) {
dengyihao's avatar
dengyihao 已提交
684
        SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
685
        addConnToPool(thrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
686 687 688 689 690
      }
    }
  }
  return res;
}
U
ubuntu 已提交
691
static void cliSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
692 693
  SCliConn* pConn = transReqQueueRemove(req);
  if (pConn == NULL) return;
dengyihao's avatar
dengyihao 已提交
694

dengyihao's avatar
dengyihao 已提交
695
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
696
    tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
697
  } else {
S
Shengliang Guan 已提交
698
    tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
U
ubuntu 已提交
699
    cliHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
700 701
    return;
  }
dengyihao's avatar
dengyihao 已提交
702
  if (cliHandleNoResp(pConn) == true) {
dengyihao's avatar
dengyihao 已提交
703
    tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
704 705
    return;
  }
dengyihao's avatar
dengyihao 已提交
706
  uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
707
}
dengyihao's avatar
dengyihao 已提交
708

U
ubuntu 已提交
709
void cliSend(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
710 711
  CONN_HANDLE_BROKEN(pConn);

dengyihao's avatar
dengyihao 已提交
712
  assert(!transQueueEmpty(&pConn->cliMsgs));
dengyihao's avatar
dengyihao 已提交
713 714

  SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
715
  CONN_GET_NEXT_SENDMSG(pConn);
dengyihao's avatar
dengyihao 已提交
716 717
  pCliMsg->sent = 1;

dengyihao's avatar
dengyihao 已提交
718
  STransConnCtx* pCtx = pCliMsg->ctx;
dengyihao's avatar
dengyihao 已提交
719

dengyihao's avatar
dengyihao 已提交
720 721
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
722

U
ubuntu 已提交
723
  STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
dengyihao's avatar
dengyihao 已提交
724 725 726 727
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
dengyihao's avatar
dengyihao 已提交
728
  int msgLen = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
729

dengyihao's avatar
dengyihao 已提交
730 731
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
  pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
dengyihao's avatar
dengyihao 已提交
732 733
  pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
  pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
734 735
  pHead->msgType = pMsg->msgType;
  pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
dengyihao's avatar
dengyihao 已提交
736
  pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
737
  memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
dengyihao's avatar
dengyihao 已提交
738
  pHead->traceId = pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
739 740

  uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
741 742

  STraceId* trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
743
  tGTrace("%s conn %p %s is sent to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
dengyihao's avatar
dengyihao 已提交
744 745 746 747
          TMSG_INFO(pHead->msgType), taosInetNtoa(((struct sockaddr_in*)&pConn->addr)->sin_addr),
          ntohs(((struct sockaddr_in*)&pConn->addr)->sin_port),
          taosInetNtoa(((struct sockaddr_in*)&pConn->localAddr)->sin_addr),
          ntohs(((struct sockaddr_in*)&pConn->localAddr)->sin_port));
dengyihao's avatar
dengyihao 已提交
748

dengyihao's avatar
dengyihao 已提交
749 750 751
  if (pHead->persist == 1) {
    CONN_SET_PERSIST_BY_APP(pConn);
  }
dengyihao's avatar
dengyihao 已提交
752

dengyihao's avatar
dengyihao 已提交
753
  uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
754
  uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
U
ubuntu 已提交
755 756
  return;
_RETURN:
dengyihao's avatar
dengyihao 已提交
757
  return;
dengyihao's avatar
dengyihao 已提交
758
}
U
ubuntu 已提交
759 760

void cliConnCb(uv_connect_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
761
  // impl later
dengyihao's avatar
dengyihao 已提交
762 763
  SCliConn* pConn = req->data;
  if (status != 0) {
S
Shengliang Guan 已提交
764
    tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
U
ubuntu 已提交
765
    cliHandleExcept(pConn);
766
    return;
dengyihao's avatar
dengyihao 已提交
767
  }
dengyihao's avatar
dengyihao 已提交
768
  int addrlen = sizeof(pConn->addr);
dengyihao's avatar
dengyihao 已提交
769
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &pConn->addr, &addrlen);
dengyihao's avatar
dengyihao 已提交
770

dengyihao's avatar
dengyihao 已提交
771
  addrlen = sizeof(pConn->localAddr);
dengyihao's avatar
dengyihao 已提交
772
  uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &pConn->localAddr, &addrlen);
dengyihao's avatar
dengyihao 已提交
773

dengyihao's avatar
dengyihao 已提交
774
  tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
775
  assert(pConn->stream == req->handle);
dengyihao's avatar
dengyihao 已提交
776

U
ubuntu 已提交
777
  cliSend(pConn);
dengyihao's avatar
dengyihao 已提交
778 779
}

dengyihao's avatar
dengyihao 已提交
780
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
781 782 783 784 785
  if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) {
    pThrd->stopMsg = pMsg;
    return;
  }
  pThrd->stopMsg = NULL;
dengyihao's avatar
dengyihao 已提交
786
  pThrd->quit = true;
U
ubuntu 已提交
787
  tDebug("cli work thread %p start to quit", pThrd);
dengyihao's avatar
dengyihao 已提交
788
  destroyCmsg(pMsg);
dengyihao's avatar
fix bug  
dengyihao 已提交
789
  destroyConnPool(pThrd->pool);
dengyihao's avatar
dengyihao 已提交
790
  uv_timer_stop(&pThrd->timer);
dengyihao's avatar
dengyihao 已提交
791
  uv_walk(pThrd->loop, cliWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
792
}
dengyihao's avatar
dengyihao 已提交
793
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
794
  int64_t    refId = (int64_t)(pMsg->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
795
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
796
  if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
797
    tDebug("%" PRId64 " already release", refId);
dengyihao's avatar
dengyihao 已提交
798 799
    destroyCmsg(pMsg);
    return;
dengyihao's avatar
dengyihao 已提交
800 801 802
  }

  SCliConn* conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
803 804
  transReleaseExHandle(transGetRefMgt(), refId);

dengyihao's avatar
dengyihao 已提交
805
  tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
806

dengyihao's avatar
dengyihao 已提交
807 808
  if (T_REF_VAL_GET(conn) == 2) {
    transUnrefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
809 810
    if (!transQueuePush(&conn->cliMsgs, pMsg)) {
      return;
dengyihao's avatar
dengyihao 已提交
811 812
    }
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
813 814
  }
}
dengyihao's avatar
dengyihao 已提交
815
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
816
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
817
  pThrd->cvtAddr = pCtx->cvtAddr;
dengyihao's avatar
dengyihao 已提交
818 819
  destroyCmsg(pMsg);
}
U
ubuntu 已提交
820

dengyihao's avatar
dengyihao 已提交
821
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
dengyihao's avatar
dengyihao 已提交
822 823 824 825
  STransConnCtx* pCtx = pMsg->ctx;
  SCliConn*      conn = NULL;

  int64_t refId = (int64_t)(pMsg->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
826
  if (refId != 0) {
dengyihao's avatar
dengyihao 已提交
827
    SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
828
    if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
829 830 831
      *ignore = true;
      destroyCmsg(pMsg);
      return NULL;
dengyihao's avatar
dengyihao 已提交
832 833
    } else {
      conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
834 835
      if (conn == NULL) {
        conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
dengyihao's avatar
dengyihao 已提交
836
        if (conn != NULL) specifyConnRef(conn, true, refId);
dengyihao's avatar
dengyihao 已提交
837
      }
dengyihao's avatar
dengyihao 已提交
838
      transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
839 840 841
    }
    return conn;
  };
dengyihao's avatar
dengyihao 已提交
842 843 844

  conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
845
    tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
846
  } else {
dengyihao's avatar
dengyihao 已提交
847
    tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
848
  }
dengyihao's avatar
dengyihao 已提交
849 850
  return conn;
}
dengyihao's avatar
dengyihao 已提交
851 852 853 854 855 856 857 858 859 860 861
void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
  if (pCvtAddr->cvt == false) {
    return;
  }
  for (int i = 0; i < pEpSet->numOfEps && pEpSet->numOfEps == 1; i++) {
    if (strncmp(pEpSet->eps[i].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
      memset(pEpSet->eps[i].fqdn, 0, TSDB_FQDN_LEN);
      memcpy(pEpSet->eps[i].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
    }
  }
}
dengyihao's avatar
dengyihao 已提交
862
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
863
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
formate  
dengyihao 已提交
864
  STrans*        pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
865

dengyihao's avatar
dengyihao 已提交
866
  cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
dengyihao's avatar
dengyihao 已提交
867 868 869 870 871
  if (!EPSET_IS_VALID(&pCtx->epSet)) {
    destroyCmsg(pMsg);
    tError("invalid epset");
    return;
  }
dengyihao's avatar
dengyihao 已提交
872

dengyihao's avatar
dengyihao 已提交
873 874 875
  bool      ignore = false;
  SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
  if (ignore == true) {
dengyihao's avatar
dengyihao 已提交
876
    tError("ignore msg");
dengyihao's avatar
dengyihao 已提交
877 878
    return;
  }
dengyihao's avatar
dengyihao 已提交
879

dengyihao's avatar
dengyihao 已提交
880
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
881
    transCtxMerge(&conn->ctx, &pCtx->appCtx);
dengyihao's avatar
dengyihao 已提交
882
    transQueuePush(&conn->cliMsgs, pMsg);
U
ubuntu 已提交
883
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
884
  } else {
U
ubuntu 已提交
885
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
886 887 888 889

    int64_t refId = (int64_t)pMsg->msg.info.handle;
    if (refId != 0) specifyConnRef(conn, true, refId);

dengyihao's avatar
dengyihao 已提交
890
    transCtxMerge(&conn->ctx, &pCtx->appCtx);
dengyihao's avatar
dengyihao 已提交
891
    transQueuePush(&conn->cliMsgs, pMsg);
dengyihao's avatar
dengyihao 已提交
892

dengyihao's avatar
dengyihao 已提交
893 894
    conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet));
    conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
895

dengyihao's avatar
dengyihao 已提交
896 897
    int ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret) {
dengyihao's avatar
dengyihao 已提交
898
      tError("%s conn %p failed to set conn option, errmsg %s", transLabel(pTransInst), conn, uv_err_name(ret));
dengyihao's avatar
dengyihao 已提交
899
    }
dengyihao's avatar
dengyihao 已提交
900
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT);
dengyihao's avatar
dengyihao 已提交
901
    if (fd == -1) {
dengyihao's avatar
dengyihao 已提交
902
      tTrace("%s conn %p failed to create socket", transLabel(pTransInst), conn);
dengyihao's avatar
dengyihao 已提交
903 904 905 906
      cliHandleExcept(conn);
      return;
    }
    uv_tcp_open((uv_tcp_t*)conn->stream, fd);
907

dengyihao's avatar
dengyihao 已提交
908
    struct sockaddr_in addr;
909 910 911
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip);
    addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
dengyihao's avatar
dengyihao 已提交
912
    tTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
dengyihao's avatar
dengyihao 已提交
913 914
    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
    if (ret != 0) {
S
Shengliang Guan 已提交
915
      tTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
dengyihao's avatar
dengyihao 已提交
916 917 918 919
             uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
920 921
  }
}
U
ubuntu 已提交
922
static void cliAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
923 924 925
  SAsyncItem* item = handle->data;
  SCliThrd*   pThrd = item->pThrd;
  SCliMsg*    pMsg = NULL;
dengyihao's avatar
dengyihao 已提交
926

dengyihao's avatar
dengyihao 已提交
927
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
formate  
dengyihao 已提交
928
  queue wq;
wafwerar's avatar
wafwerar 已提交
929
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
930
  QUEUE_MOVE(&item->qmsg, &wq);
wafwerar's avatar
wafwerar 已提交
931
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
932

dengyihao's avatar
dengyihao 已提交
933 934 935 936
  int count = 0;
  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
937 938

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
939 940
    if (pMsg == NULL) {
      continue;
dengyihao's avatar
dengyihao 已提交
941
    }
dengyihao's avatar
dengyihao 已提交
942
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
943
    count++;
dengyihao's avatar
dengyihao 已提交
944 945
  }
  if (count >= 2) {
S
Shengliang Guan 已提交
946
    tTrace("cli process batch size:%d", count);
dengyihao's avatar
dengyihao 已提交
947
  }
dengyihao's avatar
dengyihao 已提交
948
  if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
949 950
}

U
ubuntu 已提交
951
static void* cliWorkThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
952
  SCliThrd* pThrd = (SCliThrd*)arg;
dengyihao's avatar
dengyihao 已提交
953
  pThrd->pid = taosGetSelfPthreadId();
U
ubuntu 已提交
954
  setThreadName("trans-cli-work");
dengyihao's avatar
dengyihao 已提交
955
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
956
  return NULL;
dengyihao's avatar
dengyihao 已提交
957 958
}

U
ubuntu 已提交
959
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
960
  SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj));
dengyihao's avatar
dengyihao 已提交
961

dengyihao's avatar
dengyihao 已提交
962
  STrans* pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
963 964
  memcpy(cli->label, label, strlen(label));
  cli->numOfThreads = numOfThreads;
dengyihao's avatar
dengyihao 已提交
965
  cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*));
dengyihao's avatar
dengyihao 已提交
966 967

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
968
    SCliThrd* pThrd = createThrdObj();
dengyihao's avatar
dengyihao 已提交
969
    pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
dengyihao's avatar
dengyihao 已提交
970
    pThrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
971

wafwerar's avatar
wafwerar 已提交
972
    int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
973
    if (err == 0) {
S
Shengliang Guan 已提交
974
      tDebug("success to create tranport-cli thread:%d", i);
dengyihao's avatar
dengyihao 已提交
975
    }
dengyihao's avatar
dengyihao 已提交
976
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
977
  }
dengyihao's avatar
dengyihao 已提交
978

dengyihao's avatar
dengyihao 已提交
979 980
  return cli;
}
dengyihao's avatar
dengyihao 已提交
981

U
ubuntu 已提交
982
static void destroyUserdata(STransMsg* userdata) {
dengyihao's avatar
dengyihao 已提交
983 984 985 986 987 988
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
dengyihao's avatar
dengyihao 已提交
989 990
static void destroyCmsg(void* arg) {
  SCliMsg* pMsg = arg;
dengyihao's avatar
dengyihao 已提交
991 992 993 994 995
  if (pMsg == NULL) {
    return;
  }
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
wafwerar's avatar
wafwerar 已提交
996
  taosMemoryFree(pMsg);
dengyihao's avatar
dengyihao 已提交
997
}
dengyihao's avatar
dengyihao 已提交
998

dengyihao's avatar
dengyihao 已提交
999 1000
static SCliThrd* createThrdObj() {
  SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd));
U
ubuntu 已提交
1001

dengyihao's avatar
dengyihao 已提交
1002
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
1003
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
1004

wafwerar's avatar
wafwerar 已提交
1005
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
1006 1007
  uv_loop_init(pThrd->loop);

dengyihao's avatar
dengyihao 已提交
1008
  pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb);
dengyihao's avatar
dengyihao 已提交
1009 1010
  uv_timer_init(pThrd->loop, &pThrd->timer);
  pThrd->timer.data = pThrd;
dengyihao's avatar
dengyihao 已提交
1011

dengyihao's avatar
dengyihao 已提交
1012
  pThrd->pool = createConnPool(4);
dengyihao's avatar
dengyihao 已提交
1013
  transDQCreate(pThrd->loop, &pThrd->delayQueue);
dengyihao's avatar
dengyihao 已提交
1014

dengyihao's avatar
dengyihao 已提交
1015 1016
  transDQCreate(pThrd->loop, &pThrd->timeoutQueue);

dengyihao's avatar
dengyihao 已提交
1017
  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
1018 1019
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
1020
static void destroyThrdObj(SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1021 1022 1023
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1024

wafwerar's avatar
wafwerar 已提交
1025
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
1026
  CLI_RELEASE_UV(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
1027
  taosThreadMutexDestroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
1028
  TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
dengyihao's avatar
dengyihao 已提交
1029
  transAsyncPoolDestroy(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
1030

dengyihao's avatar
dengyihao 已提交
1031
  transDQDestroy(pThrd->delayQueue, destroyCmsg);
dengyihao's avatar
dengyihao 已提交
1032
  transDQDestroy(pThrd->timeoutQueue, NULL);
wafwerar's avatar
wafwerar 已提交
1033 1034
  taosMemoryFree(pThrd->loop);
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
1035
}
dengyihao's avatar
dengyihao 已提交
1036

dengyihao's avatar
dengyihao 已提交
1037
static void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
1038
  //
wafwerar's avatar
wafwerar 已提交
1039
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
1040
}
dengyihao's avatar
dengyihao 已提交
1041

dengyihao's avatar
dengyihao 已提交
1042
void cliSendQuit(SCliThrd* thrd) {
dengyihao's avatar
dengyihao 已提交
1043
  // cli can stop gracefully
wafwerar's avatar
wafwerar 已提交
1044
  SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
1045
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
1046
  transAsyncSend(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
1047
  atomic_store_8(&thrd->asyncPool->stop, 1);
dengyihao's avatar
dengyihao 已提交
1048
}
dengyihao's avatar
dengyihao 已提交
1049 1050
void cliWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
dengyihao's avatar
dengyihao 已提交
1051
    uv_read_stop((uv_stream_t*)handle);
dengyihao's avatar
dengyihao 已提交
1052
    uv_close(handle, cliDestroy);
dengyihao's avatar
dengyihao 已提交
1053 1054
  }
}
dengyihao's avatar
dengyihao 已提交
1055

U
ubuntu 已提交
1056
int cliRBChoseIdx(STrans* pTransInst) {
dengyihao's avatar
dengyihao 已提交
1057 1058 1059 1060
  int8_t index = pTransInst->index;
  if (pTransInst->numOfThreads == 0) {
    return -1;
  }
U
ubuntu 已提交
1061 1062 1063 1064 1065
  if (pTransInst->index++ >= pTransInst->numOfThreads) {
    pTransInst->index = 0;
  }
  return index % pTransInst->numOfThreads;
}
dengyihao's avatar
dengyihao 已提交
1066 1067
static void doDelayTask(void* param) {
  STaskArg* arg = param;
dengyihao's avatar
dengyihao 已提交
1068 1069
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;
dengyihao's avatar
dengyihao 已提交
1070 1071
  taosMemoryFree(arg);

dengyihao's avatar
dengyihao 已提交
1072
  cliHandleReq(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1073
}
dengyihao's avatar
dengyihao 已提交
1074

dengyihao's avatar
dengyihao 已提交
1075 1076 1077 1078
static void doCloseIdleConn(void* param) {
  STaskArg* arg = param;
  SCliConn* conn = arg->param1;
  SCliThrd* pThrd = arg->param2;
dengyihao's avatar
dengyihao 已提交
1079 1080
  tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
  conn->task = NULL;
dengyihao's avatar
dengyihao 已提交
1081 1082
  cliDestroyConn(conn, true);
  taosMemoryFree(arg);
dengyihao's avatar
dengyihao 已提交
1083 1084
}

dengyihao's avatar
dengyihao 已提交
1085 1086 1087
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
1088 1089
  STraceId* trace = &pMsg->msg.info.traceId;
  char      tbuf[256] = {0};
dengyihao's avatar
dengyihao 已提交
1090
  EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
dengyihao's avatar
dengyihao 已提交
1091
  tGDebug("%s retry on next node, use %s, retryCnt:%d, limit:%d", transLabel(pThrd->pTransInst), tbuf,
dengyihao's avatar
dengyihao 已提交
1092
          pCtx->retryCnt + 1, pCtx->retryLimit);
dengyihao's avatar
dengyihao 已提交
1093 1094 1095 1096 1097

  STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
  arg->param1 = pMsg;
  arg->param2 = pThrd;
  transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
dengyihao's avatar
dengyihao 已提交
1098
}
dengyihao's avatar
dengyihao 已提交
1099

dengyihao's avatar
dengyihao 已提交
1100
void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
dengyihao's avatar
dengyihao 已提交
1101 1102 1103 1104
  if (*val != exp) {
    *val = newVal;
  }
}
dengyihao's avatar
dengyihao 已提交
1105

dengyihao's avatar
dengyihao 已提交
1106
bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
dengyihao's avatar
dengyihao 已提交
1107 1108 1109 1110 1111 1112
  if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
    return false;
  }
  // rebuild resp msg
  SEpSet epset;
  if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) {
dengyihao's avatar
dengyihao 已提交
1113 1114 1115 1116
    return false;
  }
  int32_t tlen = tSerializeSEpSet(NULL, 0, dst);

dengyihao's avatar
dengyihao 已提交
1117 1118 1119 1120 1121 1122 1123
  char*   buf = NULL;
  int32_t len = pResp->contLen - tlen;
  if (len != 0) {
    buf = rpcMallocCont(len);
    memcpy(buf, (char*)pResp->pCont + tlen, len);
  }
  rpcFreeCont(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
1124 1125

  pResp->pCont = buf;
dengyihao's avatar
dengyihao 已提交
1126 1127 1128
  pResp->contLen = len;

  *dst = epset;
dengyihao's avatar
dengyihao 已提交
1129 1130
  return true;
}
dengyihao's avatar
dengyihao 已提交
1131
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
1132 1133
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1134

dengyihao's avatar
dengyihao 已提交
1135
  if (pMsg == NULL || pMsg->ctx == NULL) {
dengyihao's avatar
dengyihao 已提交
1136
    tTrace("%s conn %p handle resp", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
1137 1138 1139
    pTransInst->cfp(pTransInst->parent, pResp, NULL);
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
1140
  /*
dengyihao's avatar
dengyihao 已提交
1141 1142 1143
   *  no retry
   *  1. query conn
   *  2. rpc thread already receive quit msg
dengyihao's avatar
dengyihao 已提交
1144
   */
dengyihao's avatar
dengyihao 已提交
1145 1146
  STransConnCtx* pCtx = pMsg->ctx;
  int32_t        code = pResp->code;
dengyihao's avatar
dengyihao 已提交
1147 1148

  bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false;
dengyihao's avatar
dengyihao 已提交
1149
  if (retry) {
dengyihao's avatar
dengyihao 已提交
1150 1151
    pMsg->sent = 0;
    pCtx->retryCnt += 1;
dengyihao's avatar
dengyihao 已提交
1152
    if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) {
dengyihao's avatar
dengyihao 已提交
1153
      cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3);
dengyihao's avatar
dengyihao 已提交
1154 1155 1156
      if (pCtx->retryCnt < pCtx->retryLimit) {
        transUnrefCliHandle(pConn);
        EPSET_FORWARD_INUSE(&pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
1157
        transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
1158 1159 1160 1161
        cliSchedMsgToNextNode(pMsg, pThrd);
        return -1;
      }
    } else {
dengyihao's avatar
dengyihao 已提交
1162
      cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT);
dengyihao's avatar
dengyihao 已提交
1163 1164
      if (pCtx->retryCnt < pCtx->retryLimit) {
        if (pResp->contLen == 0) {
dengyihao's avatar
dengyihao 已提交
1165
          EPSET_FORWARD_INUSE(&pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
1166
        } else {
dengyihao's avatar
dengyihao 已提交
1167 1168 1169
          if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet) < 0) {
            tError("%s conn %p failed to deserialize epset", CONN_GET_INST_LABEL(pConn));
          }
dengyihao's avatar
dengyihao 已提交
1170
        }
dengyihao's avatar
dengyihao 已提交
1171
        addConnToPool(pThrd->pool, pConn);
dengyihao's avatar
dengyihao 已提交
1172 1173 1174
        transFreeMsg(pResp->pCont);
        cliSchedMsgToNextNode(pMsg, pThrd);
        return -1;
dengyihao's avatar
dengyihao 已提交
1175
      }
dengyihao's avatar
dengyihao 已提交
1176 1177
    }
  }
dengyihao's avatar
dengyihao 已提交
1178

dengyihao's avatar
dengyihao 已提交
1179
  STraceId* trace = &pResp->info.traceId;
dengyihao's avatar
dengyihao 已提交
1180

dengyihao's avatar
dengyihao 已提交
1181
  bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
1182
  if (hasEpSet) {
dengyihao's avatar
dengyihao 已提交
1183 1184
    char tbuf[256] = {0};
    EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
dengyihao's avatar
dengyihao 已提交
1185
    tGDebug("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
1186
  }
dengyihao's avatar
dengyihao 已提交
1187

dengyihao's avatar
dengyihao 已提交
1188
  if (pCtx->pSem != NULL) {
dengyihao's avatar
dengyihao 已提交
1189
    tGDebug("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
1190
    if (pCtx->pRsp == NULL) {
dengyihao's avatar
dengyihao 已提交
1191
      tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
1192 1193 1194
    } else {
      memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
    }
dengyihao's avatar
dengyihao 已提交
1195
    tsem_post(pCtx->pSem);
1196
    pCtx->pRsp = NULL;
dengyihao's avatar
dengyihao 已提交
1197
  } else {
dengyihao's avatar
dengyihao 已提交
1198
    tGDebug("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
1199
    if (retry == false && hasEpSet == true) {
dengyihao's avatar
dengyihao 已提交
1200
      pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
1201 1202 1203 1204 1205 1206
    } else {
      if (!cliIsEpsetUpdated(code, pCtx)) {
        pTransInst->cfp(pTransInst->parent, pResp, NULL);
      } else {
        pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
1207
    }
dengyihao's avatar
dengyihao 已提交
1208
  }
dengyihao's avatar
dengyihao 已提交
1209
  return 0;
dengyihao's avatar
dengyihao 已提交
1210
}
U
ubuntu 已提交
1211 1212

void transCloseClient(void* arg) {
U
ubuntu 已提交
1213
  SCliObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
1214
  for (int i = 0; i < cli->numOfThreads; i++) {
U
ubuntu 已提交
1215
    cliSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
1216
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
1217
  }
wafwerar's avatar
wafwerar 已提交
1218 1219
  taosMemoryFree(cli->pThreadObj);
  taosMemoryFree(cli);
dengyihao's avatar
dengyihao 已提交
1220
}
dengyihao's avatar
dengyihao 已提交
1221 1222 1223 1224 1225
void transRefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
1226
  tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
1227 1228 1229 1230 1231 1232 1233
  UNUSED(ref);
}
void transUnrefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
1234
  tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
1235
  if (ref == 0) {
U
ubuntu 已提交
1236
    cliDestroyConn((SCliConn*)handle, true);
dengyihao's avatar
dengyihao 已提交
1237 1238
  }
}
dengyihao's avatar
dengyihao 已提交
1239
SCliThrd* transGetWorkThrdFromHandle(int64_t handle, bool* validHandle) {
dengyihao's avatar
dengyihao 已提交
1240
  SCliThrd*  pThrd = NULL;
dengyihao's avatar
dengyihao 已提交
1241
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
1242 1243 1244
  if (exh == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1245 1246

  *validHandle = true;
dengyihao's avatar
dengyihao 已提交
1247
  pThrd = exh->pThrd;
dengyihao's avatar
dengyihao 已提交
1248
  transReleaseExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
1249 1250
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
1251
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle, bool* validHandle) {
dengyihao's avatar
dengyihao 已提交
1252
  if (handle == 0) {
dengyihao's avatar
dengyihao 已提交
1253
    int idx = cliRBChoseIdx(trans);
dengyihao's avatar
dengyihao 已提交
1254
    if (idx < 0) return NULL;
dengyihao's avatar
dengyihao 已提交
1255 1256
    return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }
D
dapan1121 已提交
1257 1258 1259 1260 1261 1262 1263
  SCliThrd* pThrd = transGetWorkThrdFromHandle(handle, validHandle);
  if (*validHandle == true && pThrd == NULL) {
    int idx = cliRBChoseIdx(trans);
    if (idx < 0) return NULL;
    pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }
  return pThrd;
dengyihao's avatar
dengyihao 已提交
1264
}
dengyihao's avatar
dengyihao 已提交
1265
int transReleaseCliHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
1266 1267 1268 1269
  int  idx = -1;
  bool valid = false;

  SCliThrd* pThrd = transGetWorkThrdFromHandle((int64_t)handle, &valid);
dengyihao's avatar
dengyihao 已提交
1270
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
1271
    return -1;
dengyihao's avatar
dengyihao 已提交
1272
  }
dengyihao's avatar
dengyihao 已提交
1273

S
Shengliang Guan 已提交
1274
  STransMsg tmsg = {.info.handle = handle};
dengyihao's avatar
dengyihao 已提交
1275 1276 1277
  TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());

  SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
1278
  cmsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
1279
  cmsg->type = Release;
dengyihao's avatar
dengyihao 已提交
1280

dengyihao's avatar
dengyihao 已提交
1281 1282 1283
  STraceId* trace = &tmsg.info.traceId;
  tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);

dengyihao's avatar
dengyihao 已提交
1284 1285 1286
  if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1287
  return 0;
dengyihao's avatar
dengyihao 已提交
1288
}
dengyihao's avatar
dengyihao 已提交
1289

dengyihao's avatar
dengyihao 已提交
1290
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
1291
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
1292 1293
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
1294
    return -1;
dengyihao's avatar
dengyihao 已提交
1295
  }
dengyihao's avatar
dengyihao 已提交
1296

dengyihao's avatar
dengyihao 已提交
1297 1298 1299
  bool      valid = false;
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid);
  if (pThrd == NULL && valid == false) {
dengyihao's avatar
dengyihao 已提交
1300
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
1301
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
1302
    return -1;
dengyihao's avatar
dengyihao 已提交
1303 1304
  }

dengyihao's avatar
dengyihao 已提交
1305
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
dengyihao's avatar
dengyihao 已提交
1306

wafwerar's avatar
wafwerar 已提交
1307
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
1308
  pCtx->epSet = *pEpSet;
S
Shengliang Guan 已提交
1309
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
1310
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
1311 1312 1313 1314

  if (ctx != NULL) {
    pCtx->appCtx = *ctx;
  }
dengyihao's avatar
dengyihao 已提交
1315
  assert(pTransInst->connType == TAOS_CONN_CLIENT);
dengyihao's avatar
dengyihao 已提交
1316

wafwerar's avatar
wafwerar 已提交
1317
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
1318
  cliMsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
1319
  cliMsg->msg = *pReq;
dengyihao's avatar
dengyihao 已提交
1320
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
1321
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
1322
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
1323

dengyihao's avatar
dengyihao 已提交
1324
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
1325
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
dengyihao's avatar
dengyihao 已提交
1326
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
1327 1328
  if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
1329
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
1330 1331
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1332
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
1333
  return 0;
dengyihao's avatar
dengyihao 已提交
1334
}
dengyihao's avatar
dengyihao 已提交
1335

dengyihao's avatar
dengyihao 已提交
1336
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
dengyihao's avatar
dengyihao 已提交
1337
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
1338 1339
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
1340
    return -1;
dengyihao's avatar
dengyihao 已提交
1341
  }
dengyihao's avatar
dengyihao 已提交
1342

dengyihao's avatar
dengyihao 已提交
1343 1344 1345
  bool      valid = false;
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid);
  if (pThrd == NULL && valid == false) {
dengyihao's avatar
dengyihao 已提交
1346
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
1347
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
1348
    return -1;
dengyihao's avatar
dengyihao 已提交
1349
  }
dengyihao's avatar
dengyihao 已提交
1350

dengyihao's avatar
dengyihao 已提交
1351 1352
  tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
  tsem_init(sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
1353

dengyihao's avatar
dengyihao 已提交
1354 1355
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());

wafwerar's avatar
wafwerar 已提交
1356
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
1357
  pCtx->epSet = *pEpSet;
dengyihao's avatar
dengyihao 已提交
1358
  pCtx->origEpSet = *pEpSet;
S
Shengliang Guan 已提交
1359
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
1360
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
1361
  pCtx->pSem = sem;
dengyihao's avatar
dengyihao 已提交
1362 1363
  pCtx->pRsp = pRsp;

wafwerar's avatar
wafwerar 已提交
1364
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
1365 1366 1367
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
1368
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
1369
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
1370

dengyihao's avatar
dengyihao 已提交
1371
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
1372
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
dengyihao's avatar
dengyihao 已提交
1373
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
1374

dengyihao's avatar
dengyihao 已提交
1375 1376
  int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
  if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1377
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
1378
    goto _RETURN;
dengyihao's avatar
dengyihao 已提交
1379
  }
dengyihao's avatar
dengyihao 已提交
1380
  tsem_wait(sem);
dengyihao's avatar
dengyihao 已提交
1381 1382

_RETURN:
dengyihao's avatar
dengyihao 已提交
1383 1384
  tsem_destroy(sem);
  taosMemoryFree(sem);
dengyihao's avatar
dengyihao 已提交
1385
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
1386
  return ret;
dengyihao's avatar
dengyihao 已提交
1387
}
dengyihao's avatar
dengyihao 已提交
1388 1389 1390
/*
 *
 **/
dengyihao's avatar
dengyihao 已提交
1391
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
dengyihao's avatar
dengyihao 已提交
1392
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
1393 1394 1395
  if (pTransInst == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1396 1397 1398 1399 1400 1401 1402

  SCvtAddr cvtAddr = {0};
  if (ip != NULL && fqdn != NULL) {
    memcpy(cvtAddr.ip, ip, strlen(ip));
    memcpy(cvtAddr.fqdn, fqdn, strlen(fqdn));
    cvtAddr.cvt = true;
  }
dengyihao's avatar
dengyihao 已提交
1403 1404
  for (int i = 0; i < pTransInst->numOfThreads; i++) {
    STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
1405
    pCtx->cvtAddr = cvtAddr;
dengyihao's avatar
dengyihao 已提交
1406 1407 1408 1409

    SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
    cliMsg->ctx = pCtx;
    cliMsg->type = Update;
dengyihao's avatar
dengyihao 已提交
1410
    cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
1411

dengyihao's avatar
dengyihao 已提交
1412
    SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
S
Shengliang Guan 已提交
1413
    tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid);
dengyihao's avatar
dengyihao 已提交
1414

dengyihao's avatar
dengyihao 已提交
1415 1416
    if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) {
      destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
1417
      transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
1418 1419
      return -1;
    }
dengyihao's avatar
dengyihao 已提交
1420
  }
dengyihao's avatar
dengyihao 已提交
1421
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
1422
  return 0;
dengyihao's avatar
dengyihao 已提交
1423
}
dengyihao's avatar
dengyihao 已提交
1424 1425 1426 1427 1428 1429 1430

int64_t transAllocHandle() {
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
  tDebug("pre alloc refId %" PRId64 "", exh->refId);
  return exh->refId;
}
dengyihao's avatar
dengyihao 已提交
1431
#endif