transCli.c 74.7 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
dengyihao's avatar
dengyihao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV
#include "transComm.h"

dengyihao's avatar
dengyihao 已提交
18
typedef struct SConnList {
dengyihao's avatar
dengyihao 已提交
19
  queue   conns;
dengyihao's avatar
dengyihao 已提交
20
  int32_t size;
dengyihao's avatar
dengyihao 已提交
21 22
} SConnList;

dengyihao's avatar
dengyihao 已提交
23 24 25 26 27 28 29 30 31
typedef struct {
  queue    wq;
  int32_t  wLen;
  int32_t  batchSize;  //
  int32_t  batch;
  char*    dst;
  char*    ip;
  uint16_t port;
} SCliBatch;
dengyihao's avatar
dengyihao 已提交
32
typedef struct SCliConn {
dengyihao's avatar
dengyihao 已提交
33
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
34 35
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
36
  queue        wreqQueue;
dengyihao's avatar
dengyihao 已提交
37 38

  uv_timer_t* timer;  // read timer, forbidden
dengyihao's avatar
dengyihao 已提交
39

dengyihao's avatar
dengyihao 已提交
40 41 42 43
  void* hostThrd;

  SConnBuffer readBuf;
  STransQueue cliMsgs;
dengyihao's avatar
dengyihao 已提交
44 45 46

  queue      q;
  SConnList* list;
dengyihao's avatar
dengyihao 已提交
47 48

  STransCtx  ctx;
dengyihao's avatar
dengyihao 已提交
49 50
  bool       broken;  // link broken or not
  ConnStatus status;  //
dengyihao's avatar
dengyihao 已提交
51

dengyihao's avatar
dengyihao 已提交
52 53
  SCliBatch* pBatch;

dengyihao's avatar
dengyihao 已提交
54 55
  int64_t refId;
  char*   ip;
dengyihao's avatar
dengyihao 已提交
56

dengyihao's avatar
dengyihao 已提交
57
  SDelayTask* task;
dengyihao's avatar
dengyihao 已提交
58

dengyihao's avatar
dengyihao 已提交
59
  // debug and log info
dengyihao's avatar
dengyihao 已提交
60 61 62
  char src[32];
  char dst[32];

dengyihao's avatar
dengyihao 已提交
63
} SCliConn;
dengyihao's avatar
dengyihao 已提交
64

dengyihao's avatar
dengyihao 已提交
65
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
66
  STransConnCtx* ctx;
dengyihao's avatar
formate  
dengyihao 已提交
67
  STransMsg      msg;
dengyihao's avatar
dengyihao 已提交
68
  queue          q;
dengyihao's avatar
dengyihao 已提交
69
  STransMsgType  type;
dengyihao's avatar
dengyihao 已提交
70

dengyihao's avatar
dengyihao 已提交
71
  int64_t  refId;
dengyihao's avatar
dengyihao 已提交
72 73
  uint64_t st;
  int      sent;  //(0: no send, 1: alread sent)
dengyihao's avatar
dengyihao 已提交
74 75
} SCliMsg;

dengyihao's avatar
dengyihao 已提交
76
typedef struct SCliThrd {
dengyihao's avatar
dengyihao 已提交
77 78 79 80 81 82
  TdThread      thread;  // tid
  int64_t       pid;     // pid
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  uv_prepare_t* prepare;
  void*         pool;  // conn pool
dengyihao's avatar
dengyihao 已提交
83
  // timer handles
dengyihao's avatar
dengyihao 已提交
84
  SArray* timerList;
dengyihao's avatar
dengyihao 已提交
85
  // msg queue
dengyihao's avatar
dengyihao 已提交
86
  queue         msg;
wafwerar's avatar
wafwerar 已提交
87
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
88
  SDelayQueue*  delayQueue;
dengyihao's avatar
dengyihao 已提交
89
  SDelayQueue*  timeoutQueue;
dengyihao's avatar
dengyihao 已提交
90 91
  uint64_t      nextTimeout;  // next timeout
  void*         pTransInst;   //
dengyihao's avatar
dengyihao 已提交
92

dengyihao's avatar
dengyihao 已提交
93
  int connCount;
dengyihao's avatar
dengyihao 已提交
94
  void (*destroyAhandleFp)(void* ahandle);
dengyihao's avatar
dengyihao 已提交
95 96
  SHashObj* fqdn2ipCache;
  SCvtAddr  cvtAddr;
dengyihao's avatar
dengyihao 已提交
97

dengyihao's avatar
dengyihao 已提交
98
  SHashObj* failFastCache;
dengyihao's avatar
dengyihao 已提交
99
  SHashObj* connLimitCache;
dengyihao's avatar
dengyihao 已提交
100
  SHashObj* batchCache;
dengyihao's avatar
dengyihao 已提交
101

dengyihao's avatar
dengyihao 已提交
102 103
  SCliMsg* stopMsg;

dengyihao's avatar
dengyihao 已提交
104
  bool quit;
dengyihao's avatar
dengyihao 已提交
105
} SCliThrd;
dengyihao's avatar
dengyihao 已提交
106

U
ubuntu 已提交
107
typedef struct SCliObj {
dengyihao's avatar
dengyihao 已提交
108 109 110 111
  char       label[TSDB_LABEL_LEN];
  int32_t    index;
  int        numOfThreads;
  SCliThrd** pThreadObj;
U
ubuntu 已提交
112
} SCliObj;
dengyihao's avatar
dengyihao 已提交
113

dengyihao's avatar
dengyihao 已提交
114 115 116 117 118 119 120
typedef struct {
  int32_t reinit;
  int64_t timestamp;
  int32_t count;
  int32_t threshold;
  int64_t interval;
} SFailFastItem;
dengyihao's avatar
dengyihao 已提交
121
// conn pool
dengyihao's avatar
dengyihao 已提交
122
// add expire timeout and capacity limit
dengyihao's avatar
dengyihao 已提交
123
static void*     createConnPool(int size);
124
static void*     destroyConnPool(void* pool);
dengyihao's avatar
dengyihao 已提交
125
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
dengyihao's avatar
dengyihao 已提交
126
static void      addConnToPool(void* pool, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
127
static void      doCloseIdleConn(void* param);
dengyihao's avatar
dengyihao 已提交
128

dengyihao's avatar
dengyihao 已提交
129 130
// register conn timer
static void cliConnTimeout(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
131 132
// register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
133
// register timer in each thread to clear expire conn
dengyihao's avatar
dengyihao 已提交
134
// static void cliTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
135
// alloc buffer for recv
dengyihao's avatar
dengyihao 已提交
136
static FORCE_INLINE void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
137
// callback after recv nbytes from socket
U
ubuntu 已提交
138
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
139
// callback after send data to socket
U
ubuntu 已提交
140
static void cliSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
141
// callback after conn to server
U
ubuntu 已提交
142 143
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
144
static void cliIdleCb(uv_idle_t* handle);
dengyihao's avatar
dengyihao 已提交
145
static void cliPrepareCb(uv_prepare_t* handle);
dengyihao's avatar
dengyihao 已提交
146

dengyihao's avatar
dengyihao 已提交
147 148
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
// static void cliConnBatchCb(uv_connect_t* req, int status);
dengyihao's avatar
dengyihao 已提交
149
static void cliSendBatchCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
150 151 152
// static void cliConnBatchCb(uv_connect_t* req, int status);
//  callback after conn to server
// static void cliConnBatchCb(uv_connect_t* req, int status);
dengyihao's avatar
dengyihao 已提交
153

dengyihao's avatar
dengyihao 已提交
154 155
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);

dengyihao's avatar
dengyihao 已提交
156
static int32_t allocConnRef(SCliConn* conn, bool update);
dengyihao's avatar
dengyihao 已提交
157

dengyihao's avatar
dengyihao 已提交
158
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
dengyihao's avatar
dengyihao 已提交
159

dengyihao's avatar
dengyihao 已提交
160
static SCliConn* cliCreateConn(SCliThrd* thrd);
U
ubuntu 已提交
161 162
static void      cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void      cliDestroy(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
163
static void      cliSend(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
164
static void      cliSendBatch(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
165
static void      cliDestroyConnMsgs(SCliConn* conn, bool destroy);
dengyihao's avatar
dengyihao 已提交
166

dengyihao's avatar
dengyihao 已提交
167
// cli util func
dengyihao's avatar
dengyihao 已提交
168 169
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
dengyihao's avatar
dengyihao 已提交
170

dengyihao's avatar
dengyihao 已提交
171
static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
dengyihao's avatar
dengyihao 已提交
172

dengyihao's avatar
dengyihao 已提交
173 174 175
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn);
static FORCE_INLINE void     cliUpdateFqdnCache(SHashObj* cache, char* fqdn);

dengyihao's avatar
dengyihao 已提交
176
// process data read from server, add decompress etc later
U
ubuntu 已提交
177
static void cliHandleResp(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
178
// handle except about conn
U
ubuntu 已提交
179
static void cliHandleExcept(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
180
static void cliReleaseUnfinishedMsg(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
181
static void cliHandleFastFail(SCliConn* pConn, int status);
dengyihao's avatar
dengyihao 已提交
182

183
// handle req from app
dengyihao's avatar
dengyihao 已提交
184 185 186 187 188 189
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
                                                                   cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
190 191
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
/// NULL,cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
192

dengyihao's avatar
dengyihao 已提交
193 194
static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
static FORCE_INLINE void destroyCmsg(void* cmsg);
dengyihao's avatar
dengyihao 已提交
195
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
dengyihao's avatar
dengyihao 已提交
196
static FORCE_INLINE int  cliRBChoseIdx(STrans* pTransInst);
dengyihao's avatar
dengyihao 已提交
197
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
198

dengyihao's avatar
dengyihao 已提交
199
// thread obj
dengyihao's avatar
dengyihao 已提交
200
static SCliThrd* createThrdObj(void* trans);
dengyihao's avatar
dengyihao 已提交
201
static void      destroyThrdObj(SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
202

dengyihao's avatar
dengyihao 已提交
203 204 205 206 207 208 209 210 211
static void cliWalkCb(uv_handle_t* handle, void* arg);

#define CLI_RELEASE_UV(loop)        \
  do {                              \
    uv_walk(loop, cliWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);   \
    uv_loop_close(loop);            \
  } while (0);

dengyihao's avatar
dengyihao 已提交
212 213 214 215 216 217
// snprintf may cause performance problem
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port)          \
  do {                                                  \
    snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
  } while (0)

dengyihao's avatar
dengyihao 已提交
218 219
#define CONN_PERSIST_TIME(para)   ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
dengyihao's avatar
dengyihao 已提交
220

dengyihao's avatar
dengyihao 已提交
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle)                         \
  do {                                                                    \
    int i = 0, sz = transQueueSize(&conn->cliMsgs);                       \
    for (; i < sz; i++) {                                                 \
      pMsg = transQueueGet(&conn->cliMsgs, i);                            \
      if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
        break;                                                            \
      }                                                                   \
    }                                                                     \
    if (i == sz) {                                                        \
      pMsg = NULL;                                                        \
      tDebug("msg not found, %" PRIu64 "", ahandle);                      \
    } else {                                                              \
      pMsg = transQueueRm(&conn->cliMsgs, i);                             \
      tDebug("msg found, %" PRIu64 "", ahandle);                          \
    }                                                                     \
dengyihao's avatar
dengyihao 已提交
237
  } while (0)
dengyihao's avatar
dengyihao 已提交
238

U
ubuntu 已提交
239 240 241 242 243 244 245 246 247 248 249 250 251
#define CONN_GET_NEXT_SENDMSG(conn)                 \
  do {                                              \
    int i = 0;                                      \
    do {                                            \
      pCliMsg = transQueueGet(&conn->cliMsgs, i++); \
      if (pCliMsg && 0 == pCliMsg->sent) {          \
        break;                                      \
      }                                             \
    } while (pCliMsg != NULL);                      \
    if (pCliMsg == NULL) {                          \
      goto _RETURN;                                 \
    }                                               \
  } while (0)
dengyihao's avatar
dengyihao 已提交
252

dengyihao's avatar
formate  
dengyihao 已提交
253 254
#define CONN_SET_PERSIST_BY_APP(conn) \
  do {                                \
dengyihao's avatar
dengyihao 已提交
255 256
    if (conn->status == ConnNormal) { \
      conn->status = ConnAcquire;     \
dengyihao's avatar
formate  
dengyihao 已提交
257 258 259
      transRefCliHandle(conn);        \
    }                                 \
  } while (0)
260

dengyihao's avatar
dengyihao 已提交
261 262 263 264
#define CONN_NO_PERSIST_BY_APP(conn) \
  (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) \
  (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
265

S
Shengliang Guan 已提交
266 267
#define REQUEST_NO_RESP(msg)         ((msg)->info.noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg)   ((msg)->info.persistHandle == 1)
dengyihao's avatar
dengyihao 已提交
268
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
dengyihao's avatar
dengyihao 已提交
269

dengyihao's avatar
dengyihao 已提交
270
#define EPSET_IS_VALID(epSet)       ((epSet) != NULL && (epSet)->numOfEps >= 0 && (epSet)->inUse >= 0)
dengyihao's avatar
dengyihao 已提交
271
#define EPSET_GET_SIZE(epSet)       (epSet)->numOfEps
dengyihao's avatar
dengyihao 已提交
272
#define EPSET_GET_INUSE_IP(epSet)   ((epSet)->eps[(epSet)->inUse].fqdn)
dengyihao's avatar
dengyihao 已提交
273
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
dengyihao's avatar
dengyihao 已提交
274 275 276 277 278 279
#define EPSET_FORWARD_INUSE(epSet)                             \
  do {                                                         \
    if ((epSet)->numOfEps != 0) {                              \
      ++((epSet)->inUse);                                      \
      (epSet)->inUse = ((epSet)->inUse) % ((epSet)->numOfEps); \
    }                                                          \
dengyihao's avatar
dengyihao 已提交
280
  } while (0)
dengyihao's avatar
dengyihao 已提交
281

dengyihao's avatar
dengyihao 已提交
282 283 284 285 286 287 288 289 290 291 292
#define EPSET_DEBUG_STR(epSet, tbuf)                                                                                   \
  do {                                                                                                                 \
    int len = snprintf(tbuf, sizeof(tbuf), "epset:{");                                                                 \
    for (int i = 0; i < (epSet)->numOfEps; i++) {                                                                      \
      if (i == (epSet)->numOfEps - 1) {                                                                                \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port);   \
      } else {                                                                                                         \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
      }                                                                                                                \
    }                                                                                                                  \
    len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse);                                    \
dengyihao's avatar
dengyihao 已提交
293
  } while (0);
dengyihao's avatar
dengyihao 已提交
294

U
ubuntu 已提交
295
static void* cliWorkThread(void* arg);
dengyihao's avatar
dengyihao 已提交
296

dengyihao's avatar
dengyihao 已提交
297 298 299 300 301 302 303 304
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
    if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
      if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
        conn->ctx.freeFunc(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
305
      } else if (msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) {
dengyihao's avatar
dengyihao 已提交
306
        tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
307
        pThrd->destroyAhandleFp(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
308 309 310 311
      }
    }
    destroyCmsg(msg);
  }
dengyihao's avatar
dengyihao 已提交
312
  transQueueClear(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
313
  memset(&conn->ctx, 0, sizeof(conn->ctx));
dengyihao's avatar
dengyihao 已提交
314
}
dengyihao's avatar
dengyihao 已提交
315
bool cliMaySendCachedMsg(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
316
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
317
    SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
318
    CONN_GET_NEXT_SENDMSG(conn);
dengyihao's avatar
dengyihao 已提交
319 320 321 322 323 324
    if (pCliMsg == NULL)
      return false;
    else {
      cliSend(conn);
      return true;
    }
dengyihao's avatar
dengyihao 已提交
325
  }
dengyihao's avatar
dengyihao 已提交
326
  return false;
U
ubuntu 已提交
327 328
_RETURN:
  return false;
dengyihao's avatar
dengyihao 已提交
329
}
dengyihao's avatar
formate  
dengyihao 已提交
330
void cliHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
331 332
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
333

dengyihao's avatar
dengyihao 已提交
334 335 336 337 338 339
  if (conn->timer) {
    if (uv_is_active((uv_handle_t*)conn->timer)) {
      tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
      uv_timer_stop(conn->timer);
    }
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
340
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
341
    conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
342 343
  }

dengyihao's avatar
opt rpc  
dengyihao 已提交
344
  STransMsgHead* pHead = NULL;
dengyihao's avatar
dengyihao 已提交
345 346 347

  int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
  if (msgLen <= 0) {
dengyihao's avatar
dengyihao 已提交
348 349 350
    tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
351 352 353 354

  if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
    tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
  }
dengyihao's avatar
dengyihao 已提交
355 356
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
357 358 359 360
  if (cliRecvReleaseReq(conn, pHead)) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
361 362 363 364 365
  STransMsg transMsg = {0};
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = transContFromHead((char*)pHead);
  transMsg.code = pHead->code;
  transMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
366
  transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
367
  transMsg.info.traceId = pHead->traceId;
dengyihao's avatar
dengyihao 已提交
368
  transMsg.info.hasEpSet = pHead->hasEpSet;
dengyihao's avatar
dengyihao 已提交
369

dengyihao's avatar
dengyihao 已提交
370 371 372 373
  SCliMsg*       pMsg = NULL;
  STransConnCtx* pCtx = NULL;
  if (CONN_NO_PERSIST_BY_APP(conn)) {
    pMsg = transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
374 375 376

    pCtx = pMsg ? pMsg->ctx : NULL;
    transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
377
    tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
U
ubuntu 已提交
378
  } else {
dengyihao's avatar
dengyihao 已提交
379 380 381
    uint64_t ahandle = (uint64_t)pHead->ahandle;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
    if (pMsg == NULL) {
S
Shengliang Guan 已提交
382
      transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
383 384
      tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
             transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
385
      if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
D
dapan1121 已提交
386
        transMsg.code = TSDB_CODE_RPC_BROKEN_LINK;
S
Shengliang Guan 已提交
387
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
dengyihao's avatar
dengyihao 已提交
388 389
        tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
390 391
      }
    } else {
dengyihao's avatar
dengyihao 已提交
392
      pCtx = pMsg->ctx;
S
Shengliang Guan 已提交
393
      transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
394
      tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
395
    }
U
ubuntu 已提交
396
  }
dengyihao's avatar
dengyihao 已提交
397
  // buf's mem alread translated to transMsg.pCont
dengyihao's avatar
dengyihao 已提交
398
  if (!CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
399
    transMsg.info.handle = (void*)conn->refId;
dengyihao's avatar
dengyihao 已提交
400
    tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
401
  }
dengyihao's avatar
dengyihao 已提交
402

dengyihao's avatar
dengyihao 已提交
403
  STraceId* trace = &transMsg.info.traceId;
dengyihao's avatar
dengyihao 已提交
404
  tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
dengyihao's avatar
dengyihao 已提交
405
          TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
dengyihao's avatar
dengyihao 已提交
406

dengyihao's avatar
dengyihao 已提交
407
  if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
408
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
409
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
410 411
    return;
  }
S
Shengliang Guan 已提交
412
  if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
413
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
414
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
415 416
    return;
  }
dengyihao's avatar
dengyihao 已提交
417

dengyihao's avatar
dengyihao 已提交
418
  if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
419 420 421
    if (cliAppCb(conn, &transMsg, pMsg) != 0) {
      return;
    }
dengyihao's avatar
dengyihao 已提交
422
  }
dengyihao's avatar
dengyihao 已提交
423 424
  destroyCmsg(pMsg);

dengyihao's avatar
dengyihao 已提交
425
  if (cliMaySendCachedMsg(conn) == true) {
dengyihao's avatar
dengyihao 已提交
426 427
    return;
  }
dengyihao's avatar
dengyihao 已提交
428

U
ubuntu 已提交
429
  if (CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
430
    return addConnToPool(pThrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
431
  }
dengyihao's avatar
test  
dengyihao 已提交
432

dengyihao's avatar
dengyihao 已提交
433
  uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
434
}
U
ubuntu 已提交
435

dengyihao's avatar
dengyihao 已提交
436
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
dengyihao's avatar
dengyihao 已提交
437
  if (transQueueEmpty(&pConn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
438
    if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
dengyihao's avatar
dengyihao 已提交
439
      tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
formate  
dengyihao 已提交
440 441 442
      transUnrefCliHandle(pConn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
443
  }
dengyihao's avatar
dengyihao 已提交
444 445 446
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
  bool      once = false;
D
dapan1121 已提交
447
  do {
dengyihao's avatar
dengyihao 已提交
448
    SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
dengyihao's avatar
enh log  
dengyihao 已提交
449

D
dapan1121 已提交
450 451
    if (pMsg == NULL && once) {
      break;
dengyihao's avatar
dengyihao 已提交
452
    }
dengyihao's avatar
enh log  
dengyihao 已提交
453 454 455 456 457 458

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg)) {
      destroyCmsg(pMsg);
      break;
    }

dengyihao's avatar
dengyihao 已提交
459
    STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
U
ubuntu 已提交
460

dengyihao's avatar
dengyihao 已提交
461
    STransMsg transMsg = {0};
dengyihao's avatar
dengyihao 已提交
462
    transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
dengyihao's avatar
dengyihao 已提交
463
    transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
S
Shengliang Guan 已提交
464
    transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
465

dengyihao's avatar
dengyihao 已提交
466
    if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
S
Shengliang Guan 已提交
467
      transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
468
      tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
dengyihao's avatar
dengyihao 已提交
469
             TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
470
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
471 472 473
        int32_t msgType = 0;
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType);
        transMsg.msgType = msgType;
dengyihao's avatar
dengyihao 已提交
474
        tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
S
Shengliang Guan 已提交
475
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
476
      }
dengyihao's avatar
dengyihao 已提交
477
    } else {
dengyihao's avatar
dengyihao 已提交
478
      transMsg.info.ahandle = (pMsg != NULL && pMsg->type != Release && pCtx) ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
479 480 481
    }

    if (pCtx == NULL || pCtx->pSem == NULL) {
S
Shengliang Guan 已提交
482
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
483 484 485 486 487
        if (pMsg == NULL || REQUEST_NO_RESP(&pMsg->msg) || pMsg->type == Release) {
          destroyCmsg(pMsg);
          once = true;
          continue;
        }
U
ubuntu 已提交
488
      }
dengyihao's avatar
dengyihao 已提交
489
    }
dengyihao's avatar
enh log  
dengyihao 已提交
490

dengyihao's avatar
dengyihao 已提交
491
    if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
492 493 494
      if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
        return;
      }
dengyihao's avatar
dengyihao 已提交
495 496
    }
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
497
    tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn));
D
dapan1121 已提交
498
  } while (!transQueueEmpty(&pConn->cliMsgs));
dengyihao's avatar
dengyihao 已提交
499
  transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
500
}
dengyihao's avatar
dengyihao 已提交
501
void cliHandleExcept(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
502
  tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
503 504
  cliHandleExceptImpl(conn, -1);
}
dengyihao's avatar
dengyihao 已提交
505

dengyihao's avatar
dengyihao 已提交
506 507
void cliConnTimeout(uv_timer_t* handle) {
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
508 509
  SCliThrd* pThrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
510
  tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
511

dengyihao's avatar
dengyihao 已提交
512
  uv_timer_stop(handle);
dengyihao's avatar
dengyihao 已提交
513 514 515
  handle->data = NULL;
  taosArrayPush(pThrd->timerList, &conn->timer);
  conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
516 517

  cliHandleFastFail(conn, UV_ECANCELED);
dengyihao's avatar
dengyihao 已提交
518
}
dengyihao's avatar
dengyihao 已提交
519 520 521 522
void cliReadTimeoutCb(uv_timer_t* handle) {
  // set up timeout cb
  SCliConn* conn = handle->data;
  tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
523
  uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
524 525
  cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
}
U
ubuntu 已提交
526 527

void* createConnPool(int size) {
528 529
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
530
}
U
ubuntu 已提交
531
void* destroyConnPool(void* pool) {
dengyihao's avatar
dengyihao 已提交
532
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
533
  while (connList != NULL) {
dengyihao's avatar
dengyihao 已提交
534 535
    while (!QUEUE_IS_EMPTY(&connList->conns)) {
      queue*    h = QUEUE_HEAD(&connList->conns);
dengyihao's avatar
dengyihao 已提交
536
      SCliConn* c = QUEUE_DATA(h, SCliConn, q);
U
ubuntu 已提交
537
      cliDestroyConn(c, true);
dengyihao's avatar
dengyihao 已提交
538
    }
dengyihao's avatar
dengyihao 已提交
539
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
540
  }
dengyihao's avatar
dengyihao 已提交
541
  taosHashCleanup(pool);
542
  return NULL;
dengyihao's avatar
dengyihao 已提交
543 544
}

dengyihao's avatar
dengyihao 已提交
545
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
dengyihao's avatar
TS-2015  
dengyihao 已提交
546
  char key[TSDB_FQDN_LEN + 64] = {0};
dengyihao's avatar
dengyihao 已提交
547
  CONN_CONSTRUCT_HASH_KEY(key, ip, port);
dengyihao's avatar
dengyihao 已提交
548 549

  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
550
  if (plist == NULL) {
dengyihao's avatar
dengyihao 已提交
551
    SConnList list = {0};
dengyihao's avatar
dengyihao 已提交
552 553
    taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
554
    if (plist == NULL) return NULL;
dengyihao's avatar
dengyihao 已提交
555
    QUEUE_INIT(&plist->conns);
dengyihao's avatar
dengyihao 已提交
556 557
  }

dengyihao's avatar
dengyihao 已提交
558
  if (QUEUE_IS_EMPTY(&plist->conns)) {
dengyihao's avatar
dengyihao 已提交
559 560
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
561 562

  plist->size -= 1;
dengyihao's avatar
dengyihao 已提交
563
  queue*    h = QUEUE_HEAD(&plist->conns);
dengyihao's avatar
dengyihao 已提交
564
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
dengyihao's avatar
dengyihao 已提交
565
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
566 567
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
568

dengyihao's avatar
dengyihao 已提交
569 570 571 572
  if (conn->task != NULL) {
    transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
    conn->task = NULL;
  }
dengyihao's avatar
dengyihao 已提交
573
  return conn;
dengyihao's avatar
dengyihao 已提交
574
}
dengyihao's avatar
dengyihao 已提交
575 576 577 578 579 580
static void addConnToPool(void* pool, SCliConn* conn) {
  if (conn->status == ConnInPool) {
    return;
  }
  allocConnRef(conn, true);

dengyihao's avatar
dengyihao 已提交
581
  SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
582 583 584 585 586 587
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    taosArrayPush(thrd->timerList, &conn->timer);
    conn->timer->data = NULL;
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
588 589 590 591 592
  if (T_REF_VAL_GET(conn) > 1) {
    transUnrefCliHandle(conn);
  }

  cliDestroyConnMsgs(conn, false);
dengyihao's avatar
dengyihao 已提交
593

dengyihao's avatar
dengyihao 已提交
594 595 596 597
  conn->status = ConnInPool;

  if (conn->list == NULL) {
    tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
dengyihao's avatar
dengyihao 已提交
598
    conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
dengyihao's avatar
dengyihao 已提交
599 600
  } else {
    tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
dengyihao's avatar
dengyihao 已提交
601
  }
dengyihao's avatar
dengyihao 已提交
602
  QUEUE_PUSH(&conn->list->conns, &conn->q);
dengyihao's avatar
dengyihao 已提交
603
  conn->list->size += 1;
dengyihao's avatar
dengyihao 已提交
604

dengyihao's avatar
dengyihao 已提交
605
  if (conn->list->size >= 250) {
dengyihao's avatar
dengyihao 已提交
606 607
    STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
    arg->param1 = conn;
dengyihao's avatar
dengyihao 已提交
608
    arg->param2 = thrd;
dengyihao's avatar
dengyihao 已提交
609 610

    STrans* pTransInst = thrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
611 612
    conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
  }
dengyihao's avatar
dengyihao 已提交
613
}
dengyihao's avatar
dengyihao 已提交
614
static int32_t allocConnRef(SCliConn* conn, bool update) {
dengyihao's avatar
dengyihao 已提交
615
  if (update) {
dengyihao's avatar
dengyihao 已提交
616
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
617
    transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
618
    conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
619 620 621 622
  }
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
623
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
dengyihao's avatar
dengyihao 已提交
624
  conn->refId = exh->refId;
625 626 627 628

  if (conn->refId == -1) {
    taosMemoryFree(exh);
  }
dengyihao's avatar
dengyihao 已提交
629 630 631 632 633
  return 0;
}

static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
  if (update) {
dengyihao's avatar
dengyihao 已提交
634
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
635 636 637 638 639 640 641 642 643 644 645 646 647
    transRemoveExHandle(transGetRefMgt(), conn->refId);
    conn->refId = -1;
  }
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
  if (exh == NULL) {
    return -1;
  }
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
  conn->refId = exh->refId;

  transReleaseExHandle(transGetRefMgt(), handle);
  return 0;
dengyihao's avatar
dengyihao 已提交
648
}
dengyihao's avatar
dengyihao 已提交
649

dengyihao's avatar
dengyihao 已提交
650
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
651
  SCliConn*    conn = handle->data;
dengyihao's avatar
dengyihao 已提交
652
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
653
  tDebug("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
654
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
655
}
U
ubuntu 已提交
656
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
657
  // impl later
dengyihao's avatar
dengyihao 已提交
658 659 660
  if (handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
661 662
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
663
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
664
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
665
    while (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
666
      tDebug("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
667 668 669 670 671 672
      if (pBuf->invalid) {
        cliHandleExcept(conn);
        break;
      } else {
        cliHandleResp(conn);
      }
dengyihao's avatar
dengyihao 已提交
673
    }
dengyihao's avatar
dengyihao 已提交
674 675
    return;
  }
dengyihao's avatar
dengyihao 已提交
676

677
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
678 679 680
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
dengyihao's avatar
dengyihao 已提交
681
    tTrace("%s conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
682 683
    return;
  }
dengyihao's avatar
fix bug  
dengyihao 已提交
684
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
685
    tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
686
    conn->broken = true;
U
ubuntu 已提交
687
    cliHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
688
  }
dengyihao's avatar
dengyihao 已提交
689
}
dengyihao's avatar
dengyihao 已提交
690

dengyihao's avatar
dengyihao 已提交
691
static SCliConn* cliCreateConn(SCliThrd* pThrd) {
wafwerar's avatar
wafwerar 已提交
692
  SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
693
  // read/write stream handle
G
gccgdb1234 已提交
694
  conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
695 696 697
  uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
  conn->stream->data = conn;

dengyihao's avatar
dengyihao 已提交
698 699 700 701 702 703 704 705
  uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
  if (timer == NULL) {
    timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    tDebug("no available timer, create a timer %p", timer);
    uv_timer_init(pThrd->loop, timer);
  }
  timer->data = conn;
  conn->timer = timer;
dengyihao's avatar
dengyihao 已提交
706

dengyihao's avatar
dengyihao 已提交
707
  conn->connReq.data = conn;
dengyihao's avatar
dengyihao 已提交
708 709
  transReqQueueInit(&conn->wreqQueue);

dengyihao's avatar
dengyihao 已提交
710
  transQueueInit(&conn->cliMsgs, NULL);
dengyihao's avatar
opt rpc  
dengyihao 已提交
711 712

  transInitBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
713
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
714
  conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
715 716
  conn->status = ConnNormal;
  conn->broken = 0;
dengyihao's avatar
dengyihao 已提交
717
  transRefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
718

dengyihao's avatar
dengyihao 已提交
719
  atomic_add_fetch_32(&pThrd->connCount, 1);
dengyihao's avatar
dengyihao 已提交
720
  allocConnRef(conn, false);
dengyihao's avatar
dengyihao 已提交
721

dengyihao's avatar
dengyihao 已提交
722 723
  return conn;
}
U
ubuntu 已提交
724
static void cliDestroyConn(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
725
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
726
  tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
727 728
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
729
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
730
  transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
opt rpc  
dengyihao 已提交
731
  conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
732

dengyihao's avatar
dengyihao 已提交
733 734 735 736 737 738 739
  if (conn->task != NULL) {
    transDQCancel(pThrd->timeoutQueue, conn->task);
    conn->task = NULL;
  }
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
740
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
741 742
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
743

dengyihao's avatar
dengyihao 已提交
744
  if (clear) {
dengyihao's avatar
dengyihao 已提交
745
    if (!uv_is_closing((uv_handle_t*)conn->stream)) {
dengyihao's avatar
dengyihao 已提交
746
      uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
747 748
      uv_close((uv_handle_t*)conn->stream, cliDestroy);
    }
749
  }
dengyihao's avatar
dengyihao 已提交
750
}
U
ubuntu 已提交
751
static void cliDestroy(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
752 753 754
  if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
755
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
756
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
757 758
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
dengyihao's avatar
dengyihao 已提交
759 760
    taosArrayPush(pThrd->timerList, &conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
761 762
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
763 764 765
  int32_t* oVal = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip));
  int32_t  nVal = oVal == NULL ? 0 : (*oVal) - 1;
  taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal));
dengyihao's avatar
dengyihao 已提交
766

dengyihao's avatar
dengyihao 已提交
767 768
  atomic_sub_fetch_32(&pThrd->connCount, 1);

dengyihao's avatar
dengyihao 已提交
769
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
770
  transRemoveExHandle(transGetRefMgt(), conn->refId);
wafwerar's avatar
wafwerar 已提交
771 772
  taosMemoryFree(conn->ip);
  taosMemoryFree(conn->stream);
dengyihao's avatar
dengyihao 已提交
773 774 775

  cliDestroyConnMsgs(conn, true);

dengyihao's avatar
dengyihao 已提交
776
  tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
777
  transReqQueueClear(&conn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
778
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
779

wafwerar's avatar
wafwerar 已提交
780
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
781
}
dengyihao's avatar
dengyihao 已提交
782
static bool cliHandleNoResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
783 784
  bool res = false;
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
785
    SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0);
dengyihao's avatar
dengyihao 已提交
786
    if (REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
787
      transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
788 789 790 791 792
      destroyCmsg(pMsg);
      res = true;
    }
    if (res == true) {
      if (cliMaySendCachedMsg(conn) == false) {
dengyihao's avatar
dengyihao 已提交
793
        SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
794
        addConnToPool(thrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
795 796 797
        res = false;
      } else {
        res = true;
dengyihao's avatar
dengyihao 已提交
798 799 800 801 802
      }
    }
  }
  return res;
}
U
ubuntu 已提交
803
static void cliSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
804 805
  SCliConn* pConn = transReqQueueRemove(req);
  if (pConn == NULL) return;
dengyihao's avatar
dengyihao 已提交
806

dengyihao's avatar
dengyihao 已提交
807 808 809
  SCliMsg* pMsg = !transQueueEmpty(&pConn->cliMsgs) ? transQueueGet(&pConn->cliMsgs, 0) : NULL;
  if (pMsg != NULL) {
    int64_t cost = taosGetTimestampUs() - pMsg->st;
dengyihao's avatar
dengyihao 已提交
810
    if (cost > 1000 * 20) {
dengyihao's avatar
dengyihao 已提交
811 812 813 814
      tWarn("%s conn %p send cost:%dus, send exception", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
    }
  }

dengyihao's avatar
dengyihao 已提交
815
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
816
    tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
817
  } else {
dengyihao's avatar
dengyihao 已提交
818 819 820 821
    if (!uv_is_closing((uv_handle_t*)&pConn->stream)) {
      tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
      cliHandleExcept(pConn);
    }
dengyihao's avatar
dengyihao 已提交
822 823
    return;
  }
dengyihao's avatar
dengyihao 已提交
824
  if (cliHandleNoResp(pConn) == true) {
dengyihao's avatar
dengyihao 已提交
825
    tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
826 827
    return;
  }
dengyihao's avatar
dengyihao 已提交
828
  uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
829
}
dengyihao's avatar
dengyihao 已提交
830 831 832 833 834 835 836 837 838 839
void cliSendBatch(SCliConn* pConn) {
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  SCliBatch* pBatch = pConn->pBatch;
  int32_t    wLen = pBatch->wLen;

  uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
  int       i = 0;

dengyihao's avatar
dengyihao 已提交
840 841
  queue* h = NULL;
  QUEUE_FOREACH(h, &pBatch->wq) {
dengyihao's avatar
dengyihao 已提交
842 843 844 845 846 847 848 849 850
    SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q);

    STransConnCtx* pCtx = pCliMsg->ctx;

    STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
    if (pMsg->pCont == 0) {
      pMsg->pCont = (void*)rpcMallocCont(0);
      pMsg->contLen = 0;
    }
dengyihao's avatar
dengyihao 已提交
851

dengyihao's avatar
dengyihao 已提交
852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884
    int            msgLen = transMsgLenFromCont(pMsg->contLen);
    STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);

    if (pHead->comp == 0) {
      pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
      pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
      pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
      pHead->msgType = pMsg->msgType;
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
      memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
      pHead->traceId = pMsg->info.traceId;
      pHead->magicNum = htonl(TRANS_MAGIC_NUM);
    }
    pHead->timestamp = taosHton64(taosGetTimestampUs());

    if (pHead->comp == 0) {
      if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
        msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
        pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      }
    } else {
      msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
    }

    wb[i++] = uv_buf_init((char*)pHead, msgLen);
  }

  uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
  req->data = pConn;
  uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
  taosMemoryFree(wb);
}
U
ubuntu 已提交
885
void cliSend(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
886 887 888 889 890
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  if (transQueueEmpty(&pConn->cliMsgs)) {
    tError("%s conn %p not msg to send", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
891
    cliHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
892 893
    return;
  }
dengyihao's avatar
dengyihao 已提交
894 895

  SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
896
  CONN_GET_NEXT_SENDMSG(pConn);
dengyihao's avatar
dengyihao 已提交
897 898
  pCliMsg->sent = 1;

dengyihao's avatar
dengyihao 已提交
899
  STransConnCtx* pCtx = pCliMsg->ctx;
dengyihao's avatar
dengyihao 已提交
900

U
ubuntu 已提交
901
  STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
dengyihao's avatar
dengyihao 已提交
902 903 904 905
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
dengyihao's avatar
dengyihao 已提交
906

dengyihao's avatar
dengyihao 已提交
907
  int            msgLen = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
908
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
909

dengyihao's avatar
dengyihao 已提交
910 911 912 913 914 915 916 917 918 919 920
  if (pHead->comp == 0) {
    pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
    pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
    pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
    pHead->msgType = pMsg->msgType;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
    memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
    pHead->traceId = pMsg->info.traceId;
    pHead->magicNum = htonl(TRANS_MAGIC_NUM);
  }
dengyihao's avatar
dengyihao 已提交
921
  pHead->timestamp = taosHton64(taosGetTimestampUs());
dengyihao's avatar
dengyihao 已提交
922

dengyihao's avatar
dengyihao 已提交
923 924 925
  if (pHead->persist == 1) {
    CONN_SET_PERSIST_BY_APP(pConn);
  }
dengyihao's avatar
dengyihao 已提交
926

dengyihao's avatar
dengyihao 已提交
927
  STraceId* trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
928

dengyihao's avatar
dengyihao 已提交
929
  if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
dengyihao's avatar
dengyihao 已提交
930
    uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
dengyihao's avatar
dengyihao 已提交
931 932
    if (timer == NULL) {
      timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
dengyihao's avatar
dengyihao 已提交
933
      tDebug("no available timer, create a timer %p", timer);
dengyihao's avatar
dengyihao 已提交
934 935 936 937 938
      uv_timer_init(pThrd->loop, timer);
    }
    timer->data = pConn;
    pConn->timer = timer;

dengyihao's avatar
dengyihao 已提交
939 940 941
    tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
    uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
  }
dengyihao's avatar
dengyihao 已提交
942

dengyihao's avatar
dengyihao 已提交
943 944 945 946 947 948 949
  if (pHead->comp == 0) {
    if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
      msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    }
  } else {
    msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
dengyihao's avatar
dengyihao 已提交
950
  }
dengyihao's avatar
dengyihao 已提交
951

dengyihao's avatar
dengyihao 已提交
952 953
  tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
          TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
dengyihao's avatar
dengyihao 已提交
954

dengyihao's avatar
dengyihao 已提交
955
  uv_buf_t    wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
956
  uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
957 958 959

  int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
960
    tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
dengyihao's avatar
dengyihao 已提交
961 962 963
            uv_err_name(status));
    cliHandleExcept(pConn);
  }
U
ubuntu 已提交
964 965
  return;
_RETURN:
dengyihao's avatar
dengyihao 已提交
966
  return;
dengyihao's avatar
dengyihao 已提交
967
}
dengyihao's avatar
dengyihao 已提交
968 969

static SCliBatch* cliDumpBatch(SCliBatch* pBatch) {
dengyihao's avatar
dengyihao 已提交
970
  SCliBatch* pNewBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
dengyihao's avatar
dengyihao 已提交
971 972 973 974 975 976 977

  QUEUE_INIT(&pNewBatch->wq);
  while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
    queue* h = QUEUE_HEAD(&pBatch->wq);
    QUEUE_REMOVE(h);
    QUEUE_PUSH(&pNewBatch->wq, h);
  }
dengyihao's avatar
dengyihao 已提交
978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993

  pNewBatch->batchSize = pBatch->batchSize;
  pNewBatch->batch = pBatch->batch;
  pNewBatch->wLen = pBatch->wLen;
  pNewBatch->dst = strdup(pBatch->dst);
  pNewBatch->ip = strdup(pBatch->ip);
  pNewBatch->port = pBatch->port;

  QUEUE_INIT(&pBatch->wq);
  pBatch->batchSize = 0;
  pBatch->batch = 0;
  pBatch->wLen = 0;

  return pNewBatch;
}
static void cliDestroyBatch(SCliBatch* pBatch) {
dengyihao's avatar
dengyihao 已提交
994
  while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
995
    queue*   h = QUEUE_HEAD(&pBatch->wq);
dengyihao's avatar
dengyihao 已提交
996
    SCliMsg* p = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
997

dengyihao's avatar
dengyihao 已提交
998
    QUEUE_REMOVE(&p->q);
dengyihao's avatar
dengyihao 已提交
999 1000 1001 1002 1003 1004
    destroyCmsg(p);
  }
  taosMemoryFree(pBatch->ip);
  taosMemoryFree(pBatch->dst);
  taosMemoryFree(pBatch);
}
dengyihao's avatar
dengyihao 已提交
1005 1006
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
  if (pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
1007 1008 1009 1010 1011 1012 1013 1014 1015 1016
    return;
  }
  STrans* pTransInst = pThrd->pTransInst;

  SCliBatch* pNewBatch = cliDumpBatch(pBatch);

  SCliConn* conn = getConnFromPool(pThrd->pool, pBatch->ip, pBatch->port);
  if (conn == NULL) {
    conn = cliCreateConn(pThrd);
    conn->pBatch = pNewBatch;
dengyihao's avatar
dengyihao 已提交
1017
    conn->ip = strdup(pNewBatch->dst);
dengyihao's avatar
dengyihao 已提交
1018

dengyihao's avatar
dengyihao 已提交
1019 1020 1021
    char*    ip = pNewBatch->ip;
    uint16_t port = pNewBatch->port;
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip);
dengyihao's avatar
dengyihao 已提交
1022 1023 1024 1025 1026 1027
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1028
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1029 1030 1031 1032 1033
      return;
    }
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1034
    addr.sin_port = (uint16_t)htons(port);
dengyihao's avatar
dengyihao 已提交
1035

dengyihao's avatar
dengyihao 已提交
1036
    tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pBatch->dst);
dengyihao's avatar
dengyihao 已提交
1037 1038
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
    if (fd == -1) {
dengyihao's avatar
dengyihao 已提交
1039 1040 1041
      tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
             tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1042 1043 1044 1045
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1046 1047
      tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1048 1049 1050 1051
      return;
    }
    ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1052 1053
      tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1054 1055 1056 1057 1058 1059 1060 1061 1062
      return;
    }

    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
    if (ret != 0) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
1063
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1064 1065 1066 1067 1068 1069 1070
      return;
    }
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
    return;
  }

  conn->pBatch = pNewBatch;
dengyihao's avatar
dengyihao 已提交
1071
  cliSendBatch(conn);
dengyihao's avatar
dengyihao 已提交
1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
}
static void cliSendBatchCb(uv_write_t* req, int status) {
  SCliConn* conn = req->data;
  SCliThrd* thrd = conn->hostThrd;
  cliDestroyBatch(conn->pBatch);
  conn->pBatch = NULL;

  if (status != 0) {
    cliHandleExcept(conn);
  } else {
    addConnToPool(thrd->pool, conn);
  }
}
dengyihao's avatar
dengyihao 已提交
1085
static void cliHandleFastFail(SCliConn* pConn, int status) {
dengyihao's avatar
dengyihao 已提交
1086
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1087
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1088
  tError("conn %p free twice, reason:%s", pConn, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107
  if (pConn->pBatch == NULL) {
    SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);

    STraceId* trace = &pMsg->msg.info.traceId;
    tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
            TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status));

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
        (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
      SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip));
      int64_t        cTimestamp = taosGetTimestampMs();
      if (item != NULL) {
        int32_t elapse = cTimestamp - item->timestamp;
        if (elapse >= 0 && elapse <= pTransInst->failFastInterval) {
          item->count++;
        } else {
          item->count = 1;
          item->timestamp = cTimestamp;
        }
dengyihao's avatar
dengyihao 已提交
1108
      } else {
dengyihao's avatar
dengyihao 已提交
1109 1110
        SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
        taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem));
dengyihao's avatar
dengyihao 已提交
1111 1112
      }
    }
dengyihao's avatar
dengyihao 已提交
1113 1114 1115
  } else {
    cliDestroyBatch(pConn->pBatch);
    pConn->pBatch = NULL;
dengyihao's avatar
dengyihao 已提交
1116 1117 1118
  }
  cliHandleExcept(pConn);
}
dengyihao's avatar
dengyihao 已提交
1119

dengyihao's avatar
dengyihao 已提交
1120 1121 1122
void cliConnCb(uv_connect_t* req, int status) {
  SCliConn* pConn = req->data;
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1123 1124 1125 1126 1127 1128 1129 1130 1131 1132
  bool      timeout = false;

  if (pConn->timer == NULL) {
    timeout = true;
  } else {
    uv_timer_stop(pConn->timer);
    pConn->timer->data = NULL;
    taosArrayPush(pThrd->timerList, &pConn->timer);
    pConn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
1133 1134

  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1135 1136 1137 1138 1139
    if (timeout == false) {
      cliHandleFastFail(pConn, status);
    } else if (timeout == true) {
      // already deal by timeout
    }
1140
    return;
dengyihao's avatar
dengyihao 已提交
1141
  }
dengyihao's avatar
dengyihao 已提交
1142

dengyihao's avatar
dengyihao 已提交
1143
  int32_t* oVal = taosHashGet(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip));
dengyihao's avatar
dengyihao 已提交
1144
  int32_t  nVal = oVal == NULL ? 0 : (*oVal) + 1;
dengyihao's avatar
dengyihao 已提交
1145
  taosHashPut(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal));
dengyihao's avatar
dengyihao 已提交
1146 1147 1148

  struct sockaddr peername, sockname;
  int             addrlen = sizeof(peername);
dengyihao's avatar
dengyihao 已提交
1149
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
dengyihao's avatar
dengyihao 已提交
1150
  transSockInfo2Str(&peername, pConn->dst);
dengyihao's avatar
dengyihao 已提交
1151

dengyihao's avatar
dengyihao 已提交
1152 1153
  addrlen = sizeof(sockname);
  uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
dengyihao's avatar
dengyihao 已提交
1154
  transSockInfo2Str(&sockname, pConn->src);
dengyihao's avatar
dengyihao 已提交
1155

dengyihao's avatar
dengyihao 已提交
1156
  tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
1157 1158 1159 1160 1161
  if (pConn->pBatch != NULL) {
    cliSendBatch(pConn);
  } else {
    cliSend(pConn);
  }
dengyihao's avatar
dengyihao 已提交
1162 1163
}

dengyihao's avatar
dengyihao 已提交
1164
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1165 1166 1167 1168 1169
  if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) {
    pThrd->stopMsg = pMsg;
    return;
  }
  pThrd->stopMsg = NULL;
dengyihao's avatar
dengyihao 已提交
1170
  pThrd->quit = true;
U
ubuntu 已提交
1171
  tDebug("cli work thread %p start to quit", pThrd);
dengyihao's avatar
dengyihao 已提交
1172
  destroyCmsg(pMsg);
dengyihao's avatar
fix bug  
dengyihao 已提交
1173
  destroyConnPool(pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1174
  uv_walk(pThrd->loop, cliWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
1175
}
dengyihao's avatar
dengyihao 已提交
1176
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1177
  int64_t    refId = (int64_t)(pMsg->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1178
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1179
  if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1180
    tDebug("%" PRId64 " already released", refId);
dengyihao's avatar
dengyihao 已提交
1181 1182
    destroyCmsg(pMsg);
    return;
dengyihao's avatar
dengyihao 已提交
1183 1184 1185
  }

  SCliConn* conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1186
  transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1187
  tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
1188

dengyihao's avatar
dengyihao 已提交
1189 1190
  if (T_REF_VAL_GET(conn) == 2) {
    transUnrefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
1191 1192
    if (!transQueuePush(&conn->cliMsgs, pMsg)) {
      return;
dengyihao's avatar
dengyihao 已提交
1193 1194
    }
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1195 1196 1197
  } else {
    tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1198 1199
  }
}
dengyihao's avatar
dengyihao 已提交
1200
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1201
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1202
  pThrd->cvtAddr = pCtx->cvtAddr;
dengyihao's avatar
dengyihao 已提交
1203 1204
  destroyCmsg(pMsg);
}
U
ubuntu 已提交
1205

dengyihao's avatar
dengyihao 已提交
1206
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
dengyihao's avatar
dengyihao 已提交
1207 1208 1209 1210
  STransConnCtx* pCtx = pMsg->ctx;
  SCliConn*      conn = NULL;

  int64_t refId = (int64_t)(pMsg->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1211
  if (refId != 0) {
dengyihao's avatar
dengyihao 已提交
1212
    SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1213
    if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1214
      tError("failed to get conn, refId: %" PRId64 "", refId);
dengyihao's avatar
dengyihao 已提交
1215 1216
      *ignore = true;
      return NULL;
dengyihao's avatar
dengyihao 已提交
1217 1218
    } else {
      conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1219 1220
      if (conn == NULL) {
        conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
dengyihao's avatar
dengyihao 已提交
1221
        if (conn != NULL) specifyConnRef(conn, true, refId);
dengyihao's avatar
dengyihao 已提交
1222
      }
dengyihao's avatar
dengyihao 已提交
1223
      transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1224 1225 1226
    }
    return conn;
  };
dengyihao's avatar
dengyihao 已提交
1227 1228 1229

  conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1230
    tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1231
  } else {
dengyihao's avatar
dengyihao 已提交
1232
    tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1233
  }
dengyihao's avatar
dengyihao 已提交
1234 1235
  return conn;
}
dengyihao's avatar
dengyihao 已提交
1236
FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
dengyihao's avatar
dengyihao 已提交
1237 1238 1239
  if (pCvtAddr->cvt == false) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1240 1241 1242
  if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
    memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN);
    memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
dengyihao's avatar
dengyihao 已提交
1243 1244
  }
}
dengyihao's avatar
dengyihao 已提交
1245

dengyihao's avatar
dengyihao 已提交
1246
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
dengyihao's avatar
dengyihao 已提交
1247
  if (code != 0) return false;
dengyihao's avatar
dengyihao 已提交
1248
  // if (pCtx->retryCnt == 0) return false;
dengyihao's avatar
dengyihao 已提交
1249 1250 1251
  if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
  return true;
}
dengyihao's avatar
dengyihao 已提交
1252
FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
dengyihao's avatar
dengyihao 已提交
1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263
  if (pMsg == NULL) return -1;

  memset(pResp, 0, sizeof(STransMsg));

  pResp->code = TSDB_CODE_RPC_BROKEN_LINK;
  pResp->msgType = pMsg->msg.msgType + 1;
  pResp->info.ahandle = pMsg->ctx ? pMsg->ctx->ahandle : NULL;
  pResp->info.traceId = pMsg->msg.info.traceId;

  return 0;
}
dengyihao's avatar
dengyihao 已提交
1264 1265 1266 1267 1268
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
  uint32_t  addr = 0;
  uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
  if (v == NULL) {
    addr = taosGetIpv4FromFqdn(fqdn);
1269 1270 1271 1272
    if (addr == 0xffffffff) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr());
      return addr;
dengyihao's avatar
dengyihao 已提交
1273 1274
    }

dengyihao's avatar
dengyihao 已提交
1275 1276 1277 1278 1279 1280 1281 1282 1283 1284
    taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
  } else {
    addr = *v;
  }
  return addr;
}
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
  // impl later
  return;
}
dengyihao's avatar
dengyihao 已提交
1285

dengyihao's avatar
dengyihao 已提交
1286 1287 1288 1289 1290 1291 1292 1293 1294 1295
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, SCliMsg* pMsg) {
  STrans* pTransInst = pThrd->pTransInst;

  STransConnCtx* pCtx = pMsg->ctx;
  char*          ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
  int32_t        port = EPSET_GET_INUSE_PORT(&pCtx->epSet);

  char key[TSDB_FQDN_LEN + 64] = {0};
  CONN_CONSTRUCT_HASH_KEY(key, ip, port);

dengyihao's avatar
dengyihao 已提交
1296
  int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
1297 1298
  if (val == NULL) return 0;

dengyihao's avatar
dengyihao 已提交
1299
  if (*val >= pTransInst->connLimitNum) {
dengyihao's avatar
dengyihao 已提交
1300 1301 1302 1303
    return -1;
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
1304
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
formate  
dengyihao 已提交
1305
  STrans*        pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1306
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1307

dengyihao's avatar
dengyihao 已提交
1308
  cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
1309
  STraceId* trace = &pMsg->msg.info.traceId;
dengyihao's avatar
dengyihao 已提交
1310

dengyihao's avatar
dengyihao 已提交
1311
  if (!EPSET_IS_VALID(&pCtx->epSet)) {
1312
    tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
dengyihao's avatar
dengyihao 已提交
1313
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1314 1315
    return;
  }
dengyihao's avatar
dengyihao 已提交
1316

dengyihao's avatar
dengyihao 已提交
1317
  if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
dengyihao's avatar
dengyihao 已提交
1318 1319 1320 1321 1322 1323 1324
    char*    ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
    uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
    char     key[TSDB_FQDN_LEN + 64] = {0};
    CONN_CONSTRUCT_HASH_KEY(key, ip, port);

    SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
    if (item != NULL) {
dengyihao's avatar
dengyihao 已提交
1325 1326
      int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp);
      if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) {
dengyihao's avatar
dengyihao 已提交
1327 1328
        tGTrace("%s, msg %s cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label,
                TMSG_INFO(pMsg->msg.msgType), ip, port, item->count, elapse);
dengyihao's avatar
dengyihao 已提交
1329 1330 1331 1332 1333 1334
        destroyCmsg(pMsg);
        return;
      }
    }
  }

dengyihao's avatar
dengyihao 已提交
1335 1336 1337
  bool      ignore = false;
  SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
  if (ignore == true) {
dengyihao's avatar
dengyihao 已提交
1338
    // persist conn already release by server
dengyihao's avatar
dengyihao 已提交
1339 1340
    STransMsg resp;
    cliBuildExceptResp(pMsg, &resp);
dengyihao's avatar
dengyihao 已提交
1341 1342 1343
    if (pMsg->type != Release) {
      pTransInst->cfp(pTransInst->parent, &resp, NULL);
    }
dengyihao's avatar
dengyihao 已提交
1344
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1345 1346
    return;
  }
dengyihao's avatar
dengyihao 已提交
1347

dengyihao's avatar
dengyihao 已提交
1348 1349 1350 1351 1352 1353 1354
  if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, pMsg)) {
    tGTrace("%s, msg %s cancel to send, reason: %s", pTransInst->label, TMSG_INFO(pMsg->msg.msgType),
            tstrerror(TSDB_CODE_RPC_MAX_SESSIONS));
    destroyCmsg(pMsg);
    return;
  }

dengyihao's avatar
dengyihao 已提交
1355
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1356
    transCtxMerge(&conn->ctx, &pCtx->appCtx);
dengyihao's avatar
dengyihao 已提交
1357
    transQueuePush(&conn->cliMsgs, pMsg);
U
ubuntu 已提交
1358
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1359
  } else {
U
ubuntu 已提交
1360
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
1361 1362 1363 1364

    int64_t refId = (int64_t)pMsg->msg.info.handle;
    if (refId != 0) specifyConnRef(conn, true, refId);

dengyihao's avatar
dengyihao 已提交
1365
    transCtxMerge(&conn->ctx, &pCtx->appCtx);
dengyihao's avatar
dengyihao 已提交
1366
    transQueuePush(&conn->cliMsgs, pMsg);
dengyihao's avatar
dengyihao 已提交
1367

dengyihao's avatar
dengyihao 已提交
1368
    char     key[TSDB_FQDN_LEN + 64] = {0};
dengyihao's avatar
dengyihao 已提交
1369
    char*    fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
1370
    uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
1371
    CONN_CONSTRUCT_HASH_KEY(key, fqdn, port);
dengyihao's avatar
dengyihao 已提交
1372 1373

    conn->ip = strdup(key);
dengyihao's avatar
dengyihao 已提交
1374

dengyihao's avatar
dengyihao 已提交
1375
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
1376 1377 1378 1379 1380 1381 1382 1383 1384
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1385

dengyihao's avatar
dengyihao 已提交
1386
    struct sockaddr_in addr;
1387
    addr.sin_family = AF_INET;
1388
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1389
    addr.sin_port = (uint16_t)htons(port);
dengyihao's avatar
dengyihao 已提交
1390

dengyihao's avatar
dengyihao 已提交
1391
    tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip);
1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
    if (fd == -1) {
      tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
              tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleExcept(conn);
      errno = 0;
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
      tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
    ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret != 0) {
      tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1412

1413
    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
dengyihao's avatar
dengyihao 已提交
1414
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1415 1416 1417 1418 1419
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1420
      cliHandleFastFail(conn, ret);
dengyihao's avatar
dengyihao 已提交
1421 1422
      return;
    }
dengyihao's avatar
dengyihao 已提交
1423
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
dengyihao's avatar
dengyihao 已提交
1424
  }
dengyihao's avatar
dengyihao 已提交
1425
  tGTrace("%s conn %p ready", pTransInst->label, conn);
dengyihao's avatar
dengyihao 已提交
1426
}
dengyihao's avatar
dengyihao 已提交
1427

dengyihao's avatar
dengyihao 已提交
1428
static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1429
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1430

dengyihao's avatar
dengyihao 已提交
1431 1432
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444
    QUEUE_REMOVE(h);

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);

    count++;
  }
  if (count >= 2) {
    tTrace("cli process batch size:%d", count);
  }
}

dengyihao's avatar
dengyihao 已提交
1445
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1446
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1447 1448
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1449
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
1450 1451

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1452
    if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
1453 1454 1455 1456 1457 1458 1459
      STransConnCtx* pCtx = pMsg->ctx;

      char*    ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
      uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
      char     key[TSDB_FQDN_LEN + 64] = {0};
      CONN_CONSTRUCT_HASH_KEY(key, ip, port);

dengyihao's avatar
dengyihao 已提交
1460 1461
      SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key));
      if (ppBatch == NULL || *ppBatch == NULL) {
dengyihao's avatar
dengyihao 已提交
1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474
        SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
        QUEUE_INIT(&pBatch->wq);
        QUEUE_PUSH(&pBatch->wq, h);
        pBatch->wLen += 1;
        pBatch->batchSize += pMsg->msg.contLen;

        pBatch->dst = strdup(key);
        pBatch->ip = strdup(ip);
        pBatch->port = (uint16_t)port;

        taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatch, sizeof(void*));
      } else {
        QUEUE_PUSH(&(*ppBatch)->wq, h);
dengyihao's avatar
dengyihao 已提交
1475 1476
        (*ppBatch)->wLen += 1;
        (*ppBatch)->batchSize += pMsg->msg.contLen;
dengyihao's avatar
dengyihao 已提交
1477
      }
dengyihao's avatar
dengyihao 已提交
1478
      continue;
dengyihao's avatar
dengyihao 已提交
1479
    }
dengyihao's avatar
dengyihao 已提交
1480
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1481
    count++;
dengyihao's avatar
dengyihao 已提交
1482
  }
dengyihao's avatar
dengyihao 已提交
1483

dengyihao's avatar
dengyihao 已提交
1484
  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
dengyihao's avatar
dengyihao 已提交
1485 1486 1487
  while (pIter != NULL) {
    SCliBatch* batch = (SCliBatch*)(*pIter);

dengyihao's avatar
dengyihao 已提交
1488 1489
    cliHandleBatchReq(batch, pThrd);
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
dengyihao's avatar
dengyihao 已提交
1490 1491
  }

dengyihao's avatar
dengyihao 已提交
1492
  if (count >= 2) {
S
Shengliang Guan 已提交
1493
    tTrace("cli process batch size:%d", count);
dengyihao's avatar
dengyihao 已提交
1494
  }
dengyihao's avatar
dengyihao 已提交
1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508
}

static void cliAsyncCb(uv_async_t* handle) {
  SAsyncItem* item = handle->data;
  SCliThrd*   pThrd = item->pThrd;
  STrans*     pTransInst = pThrd->pTransInst;

  SCliMsg* pMsg = NULL;
  // batch process to avoid to lock/unlock frequently
  queue wq;
  taosThreadMutexLock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  taosThreadMutexUnlock(&item->mtx);

dengyihao's avatar
dengyihao 已提交
1509
  int8_t supportBatch = pTransInst->supportBatch;
dengyihao's avatar
dengyihao 已提交
1510
  if (supportBatch == 0) {
dengyihao's avatar
dengyihao 已提交
1511
    cliNoBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1512
  } else if (supportBatch == 1) {
dengyihao's avatar
dengyihao 已提交
1513
    cliBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1514
  }
dengyihao's avatar
dengyihao 已提交
1515

dengyihao's avatar
dengyihao 已提交
1516
  if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1517
}
dengyihao's avatar
dengyihao 已提交
1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543
static void cliPrepareCb(uv_prepare_t* handle) {
  SCliThrd* thrd = handle->data;
  tTrace("prepare work start");

  SAsyncPool* pool = thrd->asyncPool;
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;

    queue wq;
    taosThreadMutexLock(&item->mtx);
    QUEUE_MOVE(&item->qmsg, &wq);
    taosThreadMutexUnlock(&item->mtx);

    int count = 0;
    while (!QUEUE_IS_EMPTY(&wq)) {
      queue* h = QUEUE_HEAD(&wq);
      QUEUE_REMOVE(h);

      SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
      (*cliAsyncHandle[pMsg->type])(pMsg, thrd);
      count++;
    }
  }
  tTrace("prepare work end");
  if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
dengyihao's avatar
dengyihao 已提交
1544
}
dengyihao's avatar
dengyihao 已提交
1545

dengyihao's avatar
dengyihao 已提交
1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574
void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
  transCtxCleanup(&conn->ctx);
  cliReleaseUnfinishedMsg(conn);
  if (destroy == 1) {
    transQueueDestroy(&conn->cliMsgs);
  } else {
    transQueueClear(&conn->cliMsgs);
  }
}

void cliIteraConnMsgs(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* cmsg = transQueueGet(&conn->cliMsgs, i);
    if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) {
      continue;
    }

    STransMsg resp = {0};
    if (-1 == cliBuildExceptResp(cmsg, &resp)) {
      continue;
    }
    pTransInst->cfp(pTransInst->parent, &resp, NULL);

    cmsg->ctx->ahandle = NULL;
  }
}
dengyihao's avatar
dengyihao 已提交
1575 1576 1577
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
  if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
    uint64_t ahandle = pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
1578
    tDebug("ahandle = %" PRIu64 "", ahandle);
dengyihao's avatar
dengyihao 已提交
1579 1580
    SCliMsg* pMsg = NULL;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
dengyihao's avatar
dengyihao 已提交
1581

dengyihao's avatar
dengyihao 已提交
1582 1583
    transClearBuffer(&conn->readBuf);
    transFreeMsg(transContFromHead((char*)pHead));
dengyihao's avatar
dengyihao 已提交
1584 1585 1586 1587

    for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->cliMsgs); i++) {
      SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i);
      if (cliMsg->type == Release) {
dengyihao's avatar
dengyihao 已提交
1588
        ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req");
dengyihao's avatar
dengyihao 已提交
1589 1590
        return true;
      }
dengyihao's avatar
dengyihao 已提交
1591
    }
dengyihao's avatar
dengyihao 已提交
1592 1593 1594

    cliIteraConnMsgs(conn);

dengyihao's avatar
dengyihao 已提交
1595 1596
    tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1597

dengyihao's avatar
dengyihao 已提交
1598 1599 1600 1601 1602 1603
    addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
    return true;
  }
  return false;
}

U
ubuntu 已提交
1604
static void* cliWorkThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
1605
  SCliThrd* pThrd = (SCliThrd*)arg;
dengyihao's avatar
dengyihao 已提交
1606
  pThrd->pid = taosGetSelfPthreadId();
G
gccgdb1234 已提交
1607
  setThreadName("trans-cli-work");
dengyihao's avatar
dengyihao 已提交
1608
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
dengyihao's avatar
dengyihao 已提交
1609 1610

  tDebug("thread quit-thread:%08" PRId64, pThrd->pid);
1611
  return NULL;
dengyihao's avatar
dengyihao 已提交
1612 1613
}

U
ubuntu 已提交
1614
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
1615
  SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj));
dengyihao's avatar
dengyihao 已提交
1616

dengyihao's avatar
dengyihao 已提交
1617
  STrans* pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
1618
  memcpy(cli->label, label, TSDB_LABEL_LEN);
dengyihao's avatar
dengyihao 已提交
1619
  cli->numOfThreads = numOfThreads;
dengyihao's avatar
dengyihao 已提交
1620
  cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*));
dengyihao's avatar
dengyihao 已提交
1621 1622

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
1623
    SCliThrd* pThrd = createThrdObj(shandle);
dengyihao's avatar
dengyihao 已提交
1624 1625 1626 1627 1628
    if (pThrd == NULL) {
      return NULL;
    }

    int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
1629
    if (err == 0) {
S
Shengliang Guan 已提交
1630
      tDebug("success to create tranport-cli thread:%d", i);
dengyihao's avatar
dengyihao 已提交
1631
    }
dengyihao's avatar
dengyihao 已提交
1632
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
1633
  }
dengyihao's avatar
dengyihao 已提交
1634

dengyihao's avatar
dengyihao 已提交
1635 1636
  return cli;
}
dengyihao's avatar
dengyihao 已提交
1637

dengyihao's avatar
dengyihao 已提交
1638
static FORCE_INLINE void destroyUserdata(STransMsg* userdata) {
dengyihao's avatar
dengyihao 已提交
1639 1640 1641 1642 1643 1644
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
dengyihao's avatar
dengyihao 已提交
1645

dengyihao's avatar
dengyihao 已提交
1646
static FORCE_INLINE void destroyCmsg(void* arg) {
dengyihao's avatar
dengyihao 已提交
1647
  SCliMsg* pMsg = arg;
dengyihao's avatar
dengyihao 已提交
1648 1649 1650
  if (pMsg == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1651

dengyihao's avatar
dengyihao 已提交
1652 1653
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
wafwerar's avatar
wafwerar 已提交
1654
  taosMemoryFree(pMsg);
dengyihao's avatar
dengyihao 已提交
1655
}
dengyihao's avatar
dengyihao 已提交
1656

dengyihao's avatar
dengyihao 已提交
1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
  if (param == NULL) return;

  STaskArg* arg = param;
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;

  tDebug("destroy Ahandle A");
  if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
    tDebug("destroy Ahandle B");
    pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
  }
  tDebug("destroy Ahandle C");

  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
  taosMemoryFree(pMsg);
}

dengyihao's avatar
dengyihao 已提交
1676 1677 1678
static SCliThrd* createThrdObj(void* trans) {
  STrans* pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
1679
  SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd));
U
ubuntu 已提交
1680

dengyihao's avatar
dengyihao 已提交
1681
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
1682
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
1683

wafwerar's avatar
wafwerar 已提交
1684
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
1685 1686 1687 1688 1689 1690 1691 1692
  int err = uv_loop_init(pThrd->loop);
  if (err != 0) {
    tError("failed to init uv_loop, reason:%s", uv_err_name(err));
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1693
  pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb);
dengyihao's avatar
dengyihao 已提交
1694
  if (pThrd->asyncPool == NULL) {
dengyihao's avatar
ref log  
dengyihao 已提交
1695
    tError("failed to init async pool");
dengyihao's avatar
dengyihao 已提交
1696 1697 1698 1699 1700 1701
    uv_loop_close(pThrd->loop);
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1702 1703 1704 1705

  pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
  uv_prepare_init(pThrd->loop, pThrd->prepare);
  pThrd->prepare->data = pThrd;
dengyihao's avatar
dengyihao 已提交
1706
  // uv_prepare_start(pThrd->prepare, cliPrepareCb);
dengyihao's avatar
dengyihao 已提交
1707

dengyihao's avatar
dengyihao 已提交
1708
  int32_t timerSize = 64;
dengyihao's avatar
dengyihao 已提交
1709 1710 1711 1712 1713 1714 1715
  pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
  for (int i = 0; i < timerSize; i++) {
    uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    uv_timer_init(pThrd->loop, timer);
    taosArrayPush(pThrd->timerList, &timer);
  }

dengyihao's avatar
dengyihao 已提交
1716
  pThrd->pool = createConnPool(4);
dengyihao's avatar
dengyihao 已提交
1717
  transDQCreate(pThrd->loop, &pThrd->delayQueue);
dengyihao's avatar
dengyihao 已提交
1718

dengyihao's avatar
dengyihao 已提交
1719 1720
  transDQCreate(pThrd->loop, &pThrd->timeoutQueue);

dengyihao's avatar
dengyihao 已提交
1721 1722 1723
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
  pThrd->pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
1724
  pThrd->destroyAhandleFp = pTransInst->destroyFp;
dengyihao's avatar
dengyihao 已提交
1725
  pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1726
  pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1727 1728
  pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
                                       pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK);
dengyihao's avatar
dengyihao 已提交
1729

dengyihao's avatar
dengyihao 已提交
1730
  pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1731

dengyihao's avatar
dengyihao 已提交
1732
  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
1733 1734
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
1735
static void destroyThrdObj(SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1736 1737 1738
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1739

wafwerar's avatar
wafwerar 已提交
1740
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
1741
  CLI_RELEASE_UV(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
1742
  taosThreadMutexDestroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
1743
  TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
dengyihao's avatar
dengyihao 已提交
1744
  transAsyncPoolDestroy(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
1745

dengyihao's avatar
dengyihao 已提交
1746
  transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
dengyihao's avatar
dengyihao 已提交
1747
  transDQDestroy(pThrd->timeoutQueue, NULL);
dengyihao's avatar
dengyihao 已提交
1748

dengyihao's avatar
dengyihao 已提交
1749
  tDebug("thread destroy %" PRId64, pThrd->pid);
dengyihao's avatar
dengyihao 已提交
1750 1751 1752 1753 1754
  for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
    uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
    taosMemoryFree(timer);
  }
  taosArrayDestroy(pThrd->timerList);
dengyihao's avatar
dengyihao 已提交
1755
  taosMemoryFree(pThrd->prepare);
wafwerar's avatar
wafwerar 已提交
1756
  taosMemoryFree(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
1757
  taosHashCleanup(pThrd->fqdn2ipCache);
dengyihao's avatar
dengyihao 已提交
1758
  taosHashCleanup(pThrd->failFastCache);
dengyihao's avatar
dengyihao 已提交
1759
  taosHashCleanup(pThrd->connLimitCache);
dengyihao's avatar
dengyihao 已提交
1760 1761 1762 1763 1764 1765 1766

  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
  while (pIter != NULL) {
    SCliBatch* batch = (SCliBatch*)(*pIter);
    cliDestroyBatch(batch);
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
  }
dengyihao's avatar
dengyihao 已提交
1767
  taosHashCleanup(pThrd->batchCache);
wafwerar's avatar
wafwerar 已提交
1768
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
1769
}
dengyihao's avatar
dengyihao 已提交
1770

dengyihao's avatar
dengyihao 已提交
1771
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
1772
  //
wafwerar's avatar
wafwerar 已提交
1773
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
1774
}
dengyihao's avatar
dengyihao 已提交
1775

dengyihao's avatar
dengyihao 已提交
1776
void cliSendQuit(SCliThrd* thrd) {
dengyihao's avatar
dengyihao 已提交
1777
  // cli can stop gracefully
wafwerar's avatar
wafwerar 已提交
1778
  SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
1779
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
1780
  transAsyncSend(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
1781
  atomic_store_8(&thrd->asyncPool->stop, 1);
dengyihao's avatar
dengyihao 已提交
1782
}
dengyihao's avatar
dengyihao 已提交
1783 1784
void cliWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
dengyihao's avatar
dengyihao 已提交
1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796
    if (uv_handle_get_type(handle) == UV_TIMER) {
      // SCliConn* pConn = handle->data;
      //  if (pConn != NULL && pConn->timer != NULL) {
      //    SCliThrd* pThrd = pConn->hostThrd;
      //    uv_timer_stop((uv_timer_t*)handle);
      //    handle->data = NULL;
      //    taosArrayPush(pThrd->timerList, &pConn->timer);
      //    pConn->timer = NULL;
      //  }
    } else {
      uv_read_stop((uv_stream_t*)handle);
    }
dengyihao's avatar
dengyihao 已提交
1797
    uv_close(handle, cliDestroy);
dengyihao's avatar
dengyihao 已提交
1798 1799
  }
}
dengyihao's avatar
dengyihao 已提交
1800

dengyihao's avatar
dengyihao 已提交
1801
FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
dengyihao's avatar
dengyihao 已提交
1802
  int32_t index = pTransInst->index;
dengyihao's avatar
dengyihao 已提交
1803 1804 1805
  if (pTransInst->numOfThreads == 0) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1806 1807 1808 1809
  /*
   * no lock, and to avoid CPU load imbalance, set limit pTransInst->numOfThreads * 2000;
   */
  if (pTransInst->index++ >= pTransInst->numOfThreads * 2000) {
U
ubuntu 已提交
1810 1811 1812 1813
    pTransInst->index = 0;
  }
  return index % pTransInst->numOfThreads;
}
dengyihao's avatar
dengyihao 已提交
1814
static FORCE_INLINE void doDelayTask(void* param) {
dengyihao's avatar
dengyihao 已提交
1815
  STaskArg* arg = param;
dengyihao's avatar
dengyihao 已提交
1816
  cliHandleReq((SCliMsg*)arg->param1, (SCliThrd*)arg->param2);
dengyihao's avatar
dengyihao 已提交
1817 1818
  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
1819

dengyihao's avatar
dengyihao 已提交
1820 1821 1822
static void doCloseIdleConn(void* param) {
  STaskArg* arg = param;
  SCliConn* conn = arg->param1;
dengyihao's avatar
dengyihao 已提交
1823
  tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
1824
  conn->task = NULL;
dengyihao's avatar
dengyihao 已提交
1825 1826
  cliDestroyConn(conn, true);
  taosMemoryFree(arg);
dengyihao's avatar
dengyihao 已提交
1827 1828
}

dengyihao's avatar
dengyihao 已提交
1829
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1830
  STrans*        pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1831 1832
  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
1833 1834
  STraceId* trace = &pMsg->msg.info.traceId;
  char      tbuf[256] = {0};
dengyihao's avatar
dengyihao 已提交
1835
  EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
dengyihao's avatar
dengyihao 已提交
1836 1837
  tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
          pCtx->retryStep, pCtx->retryNextInterval);
dengyihao's avatar
dengyihao 已提交
1838 1839 1840 1841

  STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
  arg->param1 = pMsg;
  arg->param2 = pThrd;
dengyihao's avatar
dengyihao 已提交
1842 1843

  transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval);
dengyihao's avatar
dengyihao 已提交
1844
}
dengyihao's avatar
dengyihao 已提交
1845

dengyihao's avatar
dengyihao 已提交
1846
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
dengyihao's avatar
dengyihao 已提交
1847 1848 1849 1850
  if (*val != exp) {
    *val = newVal;
  }
}
dengyihao's avatar
dengyihao 已提交
1851

dengyihao's avatar
dengyihao 已提交
1852
FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
dengyihao's avatar
dengyihao 已提交
1853 1854 1855 1856 1857 1858
  if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
    return false;
  }
  // rebuild resp msg
  SEpSet epset;
  if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) {
dengyihao's avatar
dengyihao 已提交
1859 1860 1861 1862
    return false;
  }
  int32_t tlen = tSerializeSEpSet(NULL, 0, dst);

dengyihao's avatar
dengyihao 已提交
1863 1864 1865 1866 1867 1868 1869
  char*   buf = NULL;
  int32_t len = pResp->contLen - tlen;
  if (len != 0) {
    buf = rpcMallocCont(len);
    memcpy(buf, (char*)pResp->pCont + tlen, len);
  }
  rpcFreeCont(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
1870 1871

  pResp->pCont = buf;
dengyihao's avatar
dengyihao 已提交
1872 1873 1874
  pResp->contLen = len;

  *dst = epset;
dengyihao's avatar
dengyihao 已提交
1875 1876
  return true;
}
dengyihao's avatar
dengyihao 已提交
1877
bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
dengyihao's avatar
dengyihao 已提交
1878
  bool noDelay = true;
dengyihao's avatar
dengyihao 已提交
1879 1880 1881 1882 1883 1884 1885
  if (hasEpSet == false) {
    if (pResp->contLen == 0) {
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
1886 1887 1888 1889 1890 1891 1892 1893 1894 1895
    } else if (pResp->contLen != 0) {
      SEpSet  epSet;
      int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
      if (valid < 0) {
        tDebug("get invalid epset, epset equal, continue");
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
1896
      } else {
dengyihao's avatar
dengyihao 已提交
1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908
        if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
          tDebug("epset not equal, retry new epset");
          pCtx->epSet = epSet;
          noDelay = false;
        } else {
          if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
            noDelay = false;
          } else {
            tDebug("epset equal, continue");
            EPSET_FORWARD_INUSE(&pCtx->epSet);
          }
        }
dengyihao's avatar
dengyihao 已提交
1909
      }
dengyihao's avatar
dengyihao 已提交
1910 1911
    }
  } else {
dengyihao's avatar
dengyihao 已提交
1912
    SEpSet  epSet;
dengyihao's avatar
dengyihao 已提交
1913 1914
    int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
    if (valid < 0) {
dengyihao's avatar
dengyihao 已提交
1915 1916 1917 1918 1919 1920
      tDebug("get invalid epset, epset equal, continue");
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
1921
    } else {
dengyihao's avatar
dengyihao 已提交
1922 1923 1924
      if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
        tDebug("epset not equal, retry new epset");
        pCtx->epSet = epSet;
dengyihao's avatar
dengyihao 已提交
1925
        noDelay = false;
dengyihao's avatar
dengyihao 已提交
1926
      } else {
dengyihao's avatar
dengyihao 已提交
1927 1928 1929 1930 1931 1932
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          tDebug("epset equal, continue");
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
1933
      }
dengyihao's avatar
dengyihao 已提交
1934 1935 1936 1937 1938
    }
  }
  return noDelay;
}
bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
1939 1940
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1941

dengyihao's avatar
dengyihao 已提交
1942 1943
  STransConnCtx* pCtx = pMsg->ctx;
  int32_t        code = pResp->code;
dengyihao's avatar
dengyihao 已提交
1944

dengyihao's avatar
dengyihao 已提交
1945
  bool retry = pTransInst->retry != NULL ? pTransInst->retry(code, pResp->msgType - 1) : false;
dengyihao's avatar
dengyihao 已提交
1946 1947 1948
  if (retry == false) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
1949 1950 1951 1952 1953 1954 1955 1956

  if (!pCtx->retryInit) {
    pCtx->retryMinInterval = pTransInst->retryMinInterval;
    pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
    pCtx->retryStepFactor = pTransInst->retryStepFactor;
    pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet;
    pCtx->retryInitTimestamp = taosGetTimestampMs();
    pCtx->retryNextInterval = pCtx->retryMinInterval;
dengyihao's avatar
dengyihao 已提交
1957
    pCtx->retryStep = 0;
dengyihao's avatar
dengyihao 已提交
1958
    pCtx->retryInit = true;
dengyihao's avatar
dengyihao 已提交
1959
    pCtx->retryCode = TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1960 1961 1962

    // already retry, not use handle specified by app;
    pMsg->msg.info.handle = 0;
dengyihao's avatar
dengyihao 已提交
1963
  }
dengyihao's avatar
dengyihao 已提交
1964

dengyihao's avatar
dengyihao 已提交
1965 1966
  if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
    return false;
dengyihao's avatar
dengyihao 已提交
1967
  }
dengyihao's avatar
dengyihao 已提交
1968

dengyihao's avatar
dengyihao 已提交
1969 1970 1971 1972 1973 1974
  // code, msgType

  // A:  epset,   leader, not self
  // B:  epset,   not know leader
  // C:  no epset, leader but not serivce

dengyihao's avatar
dengyihao 已提交
1975 1976
  bool noDelay = false;
  if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
dengyihao's avatar
dengyihao 已提交
1977
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
1978
    noDelay = cliResetEpset(pCtx, pResp, false);
dengyihao's avatar
dengyihao 已提交
1979 1980
    transFreeMsg(pResp->pCont);
    transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
1981
  } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
1982 1983
             code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED ||
             code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
dengyihao's avatar
dengyihao 已提交
1984
             code == TSDB_CODE_APP_IS_STOPPING || code == TSDB_CODE_VND_STOPPED) {
dengyihao's avatar
dengyihao 已提交
1985
    tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
1986
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
1987 1988
    transFreeMsg(pResp->pCont);
    addConnToPool(pThrd->pool, pConn);
dengyihao's avatar
dengyihao 已提交
1989
  } else if (code == TSDB_CODE_SYN_RESTORING) {
dengyihao's avatar
dengyihao 已提交
1990
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
1991
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
1992 1993
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
1994
  } else {
dengyihao's avatar
dengyihao 已提交
1995
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
1996 1997 1998
    noDelay = cliResetEpset(pCtx, pResp, false);
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
1999
  }
dengyihao's avatar
dengyihao 已提交
2000 2001 2002 2003
  if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) {
    // save one internal code
    pCtx->retryCode = code;
  }
dengyihao's avatar
dengyihao 已提交
2004

dengyihao's avatar
dengyihao 已提交
2005 2006 2007
  if (noDelay == false) {
    pCtx->epsetRetryCnt = 1;
    pCtx->retryStep++;
dengyihao's avatar
dengyihao 已提交
2008

dengyihao's avatar
dengyihao 已提交
2009 2010 2011 2012 2013 2014
    int64_t factor = pow(pCtx->retryStepFactor, pCtx->retryStep - 1);
    pCtx->retryNextInterval = factor * pCtx->retryMinInterval;
    if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
      pCtx->retryNextInterval = pCtx->retryMaxInterval;
    }

dengyihao's avatar
dengyihao 已提交
2015 2016 2017
    // if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
    //   return false;
    // }
dengyihao's avatar
dengyihao 已提交
2018 2019 2020
  } else {
    pCtx->retryNextInterval = 0;
    pCtx->epsetRetryCnt++;
dengyihao's avatar
dengyihao 已提交
2021
  }
dengyihao's avatar
dengyihao 已提交
2022

dengyihao's avatar
dengyihao 已提交
2023
  pMsg->sent = 0;
dengyihao's avatar
dengyihao 已提交
2024
  cliSchedMsgToNextNode(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
2025
  return true;
dengyihao's avatar
dengyihao 已提交
2026
}
dengyihao's avatar
dengyihao 已提交
2027
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
2028 2029
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2030

dengyihao's avatar
dengyihao 已提交
2031
  if (pMsg == NULL || pMsg->ctx == NULL) {
dengyihao's avatar
dengyihao 已提交
2032
    tTrace("%s conn %p handle resp", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
2033 2034 2035
    pTransInst->cfp(pTransInst->parent, pResp, NULL);
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
2036

dengyihao's avatar
dengyihao 已提交
2037
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
2038

dengyihao's avatar
dengyihao 已提交
2039 2040 2041
  bool retry = cliGenRetryRule(pConn, pResp, pMsg);
  if (retry == true) {
    return -1;
dengyihao's avatar
dengyihao 已提交
2042
  }
dengyihao's avatar
dengyihao 已提交
2043

dengyihao's avatar
dengyihao 已提交
2044 2045 2046
  if (pCtx->retryCode != TSDB_CODE_SUCCESS) {
    int32_t code = pResp->code;
    // return internal code app
dengyihao's avatar
dengyihao 已提交
2047 2048
    if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK ||
        code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
dengyihao's avatar
dengyihao 已提交
2049 2050 2051 2052
      pResp->code = pCtx->retryCode;
    }
  }

2053
  // check whole vnodes is offline on this vgroup
2054 2055
  if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps || pCtx->retryStep > 0) {
    if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
A
Alex Duan 已提交
2056
      pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
2057
    } else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) {
A
Alex Duan 已提交
2058
      pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK;
2059 2060 2061
    }
  }

dengyihao's avatar
dengyihao 已提交
2062 2063
  STraceId* trace = &pResp->info.traceId;
  bool      hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2064
  if (hasEpSet) {
dengyihao's avatar
dengyihao 已提交
2065 2066
    char tbuf[256] = {0};
    EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
dengyihao's avatar
dengyihao 已提交
2067
    tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
2068
  }
dengyihao's avatar
dengyihao 已提交
2069

dengyihao's avatar
dengyihao 已提交
2070
  if (pCtx->pSem != NULL) {
dengyihao's avatar
dengyihao 已提交
2071
    tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
2072
    if (pCtx->pRsp == NULL) {
dengyihao's avatar
dengyihao 已提交
2073
      tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
2074 2075 2076
    } else {
      memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
    }
dengyihao's avatar
dengyihao 已提交
2077
    tsem_post(pCtx->pSem);
2078
    pCtx->pRsp = NULL;
dengyihao's avatar
dengyihao 已提交
2079
  } else {
dengyihao's avatar
dengyihao 已提交
2080
    tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
2081
    if (retry == false && hasEpSet == true) {
dengyihao's avatar
dengyihao 已提交
2082
      pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2083
    } else {
dengyihao's avatar
dengyihao 已提交
2084
      if (!cliIsEpsetUpdated(pResp->code, pCtx)) {
dengyihao's avatar
dengyihao 已提交
2085 2086 2087 2088
        pTransInst->cfp(pTransInst->parent, pResp, NULL);
      } else {
        pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2089
    }
dengyihao's avatar
dengyihao 已提交
2090
  }
dengyihao's avatar
dengyihao 已提交
2091
  return 0;
dengyihao's avatar
dengyihao 已提交
2092
}
U
ubuntu 已提交
2093 2094

void transCloseClient(void* arg) {
U
ubuntu 已提交
2095
  SCliObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
2096
  for (int i = 0; i < cli->numOfThreads; i++) {
U
ubuntu 已提交
2097
    cliSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2098
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2099
  }
wafwerar's avatar
wafwerar 已提交
2100 2101
  taosMemoryFree(cli->pThreadObj);
  taosMemoryFree(cli);
dengyihao's avatar
dengyihao 已提交
2102
}
dengyihao's avatar
dengyihao 已提交
2103 2104 2105 2106 2107
void transRefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2108
  tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2109 2110 2111 2112 2113 2114 2115
  UNUSED(ref);
}
void transUnrefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2116
  tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2117
  if (ref == 0) {
U
ubuntu 已提交
2118
    cliDestroyConn((SCliConn*)handle, true);
dengyihao's avatar
dengyihao 已提交
2119 2120
  }
}
dengyihao's avatar
dengyihao 已提交
2121
static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2122
  SCliThrd*  pThrd = NULL;
dengyihao's avatar
dengyihao 已提交
2123
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2124 2125 2126
  if (exh == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
2127

dengyihao's avatar
dengyihao 已提交
2128 2129 2130 2131 2132 2133
  if (exh->pThrd == NULL && trans != NULL) {
    int idx = cliRBChoseIdx(trans);
    if (idx < 0) return NULL;
    exh->pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }

dengyihao's avatar
dengyihao 已提交
2134
  pThrd = exh->pThrd;
dengyihao's avatar
dengyihao 已提交
2135
  transReleaseExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2136 2137
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
2138
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2139
  if (handle == 0) {
dengyihao's avatar
dengyihao 已提交
2140
    int idx = cliRBChoseIdx(trans);
dengyihao's avatar
dengyihao 已提交
2141
    if (idx < 0) return NULL;
dengyihao's avatar
dengyihao 已提交
2142 2143
    return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }
dengyihao's avatar
dengyihao 已提交
2144
  SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle);
D
dapan1121 已提交
2145
  return pThrd;
dengyihao's avatar
dengyihao 已提交
2146
}
dengyihao's avatar
dengyihao 已提交
2147
int transReleaseCliHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
2148 2149 2150
  int  idx = -1;
  bool valid = false;

dengyihao's avatar
dengyihao 已提交
2151
  SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
dengyihao's avatar
dengyihao 已提交
2152
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2153
    return -1;
dengyihao's avatar
dengyihao 已提交
2154
  }
dengyihao's avatar
dengyihao 已提交
2155

dengyihao's avatar
dengyihao 已提交
2156
  STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527};
dengyihao's avatar
rm code  
dengyihao 已提交
2157
  TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
dengyihao's avatar
dengyihao 已提交
2158

dengyihao's avatar
dengyihao 已提交
2159 2160 2161
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
  pCtx->ahandle = tmsg.info.ahandle;

dengyihao's avatar
dengyihao 已提交
2162
  SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2163
  cmsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
2164
  cmsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2165
  cmsg->type = Release;
dengyihao's avatar
dengyihao 已提交
2166
  cmsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2167

dengyihao's avatar
dengyihao 已提交
2168 2169 2170
  STraceId* trace = &tmsg.info.traceId;
  tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);

dengyihao's avatar
dengyihao 已提交
2171
  if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
dengyihao's avatar
dengyihao 已提交
2172
    destroyCmsg(cmsg);
dengyihao's avatar
dengyihao 已提交
2173 2174
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2175
  return 0;
dengyihao's avatar
dengyihao 已提交
2176
}
dengyihao's avatar
dengyihao 已提交
2177

dengyihao's avatar
dengyihao 已提交
2178
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
2179 2180 2181 2182 2183
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2184

dengyihao's avatar
dengyihao 已提交
2185
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
dengyihao's avatar
dengyihao 已提交
2186
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2187
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2188
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2189
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2190
  }
dengyihao's avatar
dengyihao 已提交
2191 2192 2193 2194 2195 2196 2197 2198 2199 2200
  if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) {
    char     key[TSDB_FQDN_LEN + 64] = {0};
    char*    ip = EPSET_GET_INUSE_IP((SEpSet*)pEpSet);
    uint16_t port = EPSET_GET_INUSE_PORT((SEpSet*)pEpSet);
    CONN_CONSTRUCT_HASH_KEY(key, ip, port);

    int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key));
    if (val != NULL && *val >= pTransInst->connLimitNum) {
      transFreeMsg(pReq->pCont);
      transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2201
      return TSDB_CODE_RPC_MAX_SESSIONS;
dengyihao's avatar
dengyihao 已提交
2202 2203
    }
  }
dengyihao's avatar
dengyihao 已提交
2204

dengyihao's avatar
dengyihao 已提交
2205
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
dengyihao's avatar
dengyihao 已提交
2206

wafwerar's avatar
wafwerar 已提交
2207
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2208
  pCtx->epSet = *pEpSet;
S
Shengliang Guan 已提交
2209
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2210
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2211

H
Haojun Liao 已提交
2212
  if (ctx != NULL) pCtx->appCtx = *ctx;
dengyihao's avatar
dengyihao 已提交
2213

wafwerar's avatar
wafwerar 已提交
2214
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2215
  cliMsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2216
  cliMsg->msg = *pReq;
dengyihao's avatar
dengyihao 已提交
2217
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2218
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2219
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2220

dengyihao's avatar
dengyihao 已提交
2221
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2222 2223
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2224 2225
  if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2226
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2227 2228
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2229
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2230
  return 0;
dengyihao's avatar
dengyihao 已提交
2231
}
dengyihao's avatar
dengyihao 已提交
2232

dengyihao's avatar
dengyihao 已提交
2233
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
dengyihao's avatar
dengyihao 已提交
2234 2235 2236 2237 2238
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2239

dengyihao's avatar
dengyihao 已提交
2240 2241
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2242
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2243
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2244
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2245
  }
dengyihao's avatar
dengyihao 已提交
2246

dengyihao's avatar
dengyihao 已提交
2247 2248
  tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
  tsem_init(sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
2249

dengyihao's avatar
dengyihao 已提交
2250 2251
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());

wafwerar's avatar
wafwerar 已提交
2252
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2253
  pCtx->epSet = *pEpSet;
dengyihao's avatar
dengyihao 已提交
2254
  pCtx->origEpSet = *pEpSet;
S
Shengliang Guan 已提交
2255
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2256
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2257
  pCtx->pSem = sem;
dengyihao's avatar
dengyihao 已提交
2258 2259
  pCtx->pRsp = pRsp;

wafwerar's avatar
wafwerar 已提交
2260
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2261 2262 2263
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2264
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2265
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2266

dengyihao's avatar
dengyihao 已提交
2267
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2268 2269
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2270

dengyihao's avatar
dengyihao 已提交
2271 2272
  int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
  if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
2273
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2274
    goto _RETURN;
dengyihao's avatar
dengyihao 已提交
2275
  }
dengyihao's avatar
dengyihao 已提交
2276
  tsem_wait(sem);
dengyihao's avatar
dengyihao 已提交
2277 2278

_RETURN:
dengyihao's avatar
dengyihao 已提交
2279 2280
  tsem_destroy(sem);
  taosMemoryFree(sem);
dengyihao's avatar
dengyihao 已提交
2281
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2282
  return ret;
dengyihao's avatar
dengyihao 已提交
2283
}
dengyihao's avatar
dengyihao 已提交
2284 2285 2286
/*
 *
 **/
dengyihao's avatar
dengyihao 已提交
2287
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
dengyihao's avatar
dengyihao 已提交
2288
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2289 2290 2291
  if (pTransInst == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2292 2293 2294

  SCvtAddr cvtAddr = {0};
  if (ip != NULL && fqdn != NULL) {
dengyihao's avatar
dengyihao 已提交
2295 2296
    tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
    tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn));
dengyihao's avatar
dengyihao 已提交
2297 2298
    cvtAddr.cvt = true;
  }
dengyihao's avatar
dengyihao 已提交
2299 2300
  for (int i = 0; i < pTransInst->numOfThreads; i++) {
    STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2301
    pCtx->cvtAddr = cvtAddr;
dengyihao's avatar
dengyihao 已提交
2302 2303 2304 2305

    SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
    cliMsg->ctx = pCtx;
    cliMsg->type = Update;
dengyihao's avatar
dengyihao 已提交
2306
    cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2307

dengyihao's avatar
dengyihao 已提交
2308
    SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
S
Shengliang Guan 已提交
2309
    tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid);
dengyihao's avatar
dengyihao 已提交
2310

dengyihao's avatar
dengyihao 已提交
2311 2312
    if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) {
      destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2313
      transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2314 2315
      return -1;
    }
dengyihao's avatar
dengyihao 已提交
2316
  }
dengyihao's avatar
dengyihao 已提交
2317
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2318
  return 0;
dengyihao's avatar
dengyihao 已提交
2319
}
dengyihao's avatar
dengyihao 已提交
2320 2321 2322 2323 2324

int64_t transAllocHandle() {
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
  tDebug("pre alloc refId %" PRId64 "", exh->refId);
dengyihao's avatar
dengyihao 已提交
2325

dengyihao's avatar
dengyihao 已提交
2326 2327
  return exh->refId;
}
dengyihao's avatar
dengyihao 已提交
2328
#endif