transCli.c 84.3 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
dengyihao's avatar
dengyihao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "transComm.h"

dengyihao's avatar
dengyihao 已提交
17 18 19 20 21
typedef struct {
  int32_t numOfConn;
  queue   msgQ;
} SMsgList;

dengyihao's avatar
dengyihao 已提交
22
typedef struct SConnList {
dengyihao's avatar
dengyihao 已提交
23 24 25
  queue     conns;
  int32_t   size;
  SMsgList* list;
dengyihao's avatar
dengyihao 已提交
26 27
} SConnList;

dengyihao's avatar
dengyihao 已提交
28
typedef struct {
dengyihao's avatar
dengyihao 已提交
29 30 31 32 33 34
  queue   wq;
  int32_t len;

  int connMax;
  int connCnt;
  int batchLenLimit;
dengyihao's avatar
dengyihao 已提交
35
  int sending;
dengyihao's avatar
dengyihao 已提交
36

dengyihao's avatar
dengyihao 已提交
37 38 39
  char*    dst;
  char*    ip;
  uint16_t port;
dengyihao's avatar
dengyihao 已提交
40 41 42 43 44 45 46 47 48 49

} SCliBatchList;

typedef struct {
  queue          wq;
  queue          listq;
  int32_t        wLen;
  int32_t        batchSize;  //
  int32_t        batch;
  SCliBatchList* pList;
dengyihao's avatar
dengyihao 已提交
50
} SCliBatch;
dengyihao's avatar
dengyihao 已提交
51

dengyihao's avatar
dengyihao 已提交
52
typedef struct SCliConn {
dengyihao's avatar
dengyihao 已提交
53
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
54 55
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
56
  queue        wreqQueue;
dengyihao's avatar
dengyihao 已提交
57 58

  uv_timer_t* timer;  // read timer, forbidden
dengyihao's avatar
dengyihao 已提交
59

dengyihao's avatar
dengyihao 已提交
60 61 62 63
  void* hostThrd;

  SConnBuffer readBuf;
  STransQueue cliMsgs;
dengyihao's avatar
dengyihao 已提交
64 65 66

  queue      q;
  SConnList* list;
dengyihao's avatar
dengyihao 已提交
67 68

  STransCtx  ctx;
dengyihao's avatar
dengyihao 已提交
69 70
  bool       broken;  // link broken or not
  ConnStatus status;  //
dengyihao's avatar
dengyihao 已提交
71

dengyihao's avatar
dengyihao 已提交
72 73
  SCliBatch* pBatch;

dengyihao's avatar
dengyihao 已提交
74
  SDelayTask* task;
dengyihao's avatar
dengyihao 已提交
75

dengyihao's avatar
dengyihao 已提交
76
  char* dstAddr;
dengyihao's avatar
dengyihao 已提交
77 78
  char  src[32];
  char  dst[32];
dengyihao's avatar
dengyihao 已提交
79

dengyihao's avatar
dengyihao 已提交
80
  int64_t refId;
dengyihao's avatar
dengyihao 已提交
81
} SCliConn;
dengyihao's avatar
dengyihao 已提交
82

dengyihao's avatar
dengyihao 已提交
83
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
84
  STransConnCtx* ctx;
dengyihao's avatar
formate  
dengyihao 已提交
85
  STransMsg      msg;
dengyihao's avatar
dengyihao 已提交
86
  queue          q;
dengyihao's avatar
dengyihao 已提交
87
  STransMsgType  type;
dengyihao's avatar
dengyihao 已提交
88

dengyihao's avatar
dengyihao 已提交
89
  int64_t  refId;
dengyihao's avatar
dengyihao 已提交
90 91
  uint64_t st;
  int      sent;  //(0: no send, 1: alread sent)
dengyihao's avatar
dengyihao 已提交
92 93
} SCliMsg;

dengyihao's avatar
dengyihao 已提交
94
typedef struct SCliThrd {
dengyihao's avatar
dengyihao 已提交
95 96 97 98 99 100
  TdThread      thread;  // tid
  int64_t       pid;     // pid
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  uv_prepare_t* prepare;
  void*         pool;  // conn pool
dengyihao's avatar
dengyihao 已提交
101
  // timer handles
dengyihao's avatar
dengyihao 已提交
102
  SArray* timerList;
dengyihao's avatar
dengyihao 已提交
103
  // msg queue
dengyihao's avatar
dengyihao 已提交
104
  queue         msg;
wafwerar's avatar
wafwerar 已提交
105
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
106
  SDelayQueue*  delayQueue;
dengyihao's avatar
dengyihao 已提交
107
  SDelayQueue*  timeoutQueue;
dengyihao's avatar
dengyihao 已提交
108
  SDelayQueue*  waitConnQueue;
dengyihao's avatar
dengyihao 已提交
109 110
  uint64_t      nextTimeout;  // next timeout
  void*         pTransInst;   //
dengyihao's avatar
dengyihao 已提交
111

dengyihao's avatar
dengyihao 已提交
112
  int connCount;
dengyihao's avatar
dengyihao 已提交
113
  void (*destroyAhandleFp)(void* ahandle);
dengyihao's avatar
dengyihao 已提交
114 115
  SHashObj* fqdn2ipCache;
  SCvtAddr  cvtAddr;
dengyihao's avatar
dengyihao 已提交
116

dengyihao's avatar
dengyihao 已提交
117
  SHashObj* failFastCache;
dengyihao's avatar
dengyihao 已提交
118
  SHashObj* batchCache;
dengyihao's avatar
dengyihao 已提交
119

dengyihao's avatar
dengyihao 已提交
120 121
  SCliMsg* stopMsg;

dengyihao's avatar
dengyihao 已提交
122
  bool quit;
dengyihao's avatar
dengyihao 已提交
123 124 125

  int       newConnCount;
  SHashObj* msgCount;
dengyihao's avatar
dengyihao 已提交
126
} SCliThrd;
dengyihao's avatar
dengyihao 已提交
127

U
ubuntu 已提交
128
typedef struct SCliObj {
dengyihao's avatar
dengyihao 已提交
129 130 131 132
  char       label[TSDB_LABEL_LEN];
  int32_t    index;
  int        numOfThreads;
  SCliThrd** pThreadObj;
U
ubuntu 已提交
133
} SCliObj;
dengyihao's avatar
dengyihao 已提交
134

dengyihao's avatar
dengyihao 已提交
135 136 137 138 139 140 141
typedef struct {
  int32_t reinit;
  int64_t timestamp;
  int32_t count;
  int32_t threshold;
  int64_t interval;
} SFailFastItem;
dengyihao's avatar
dengyihao 已提交
142

dengyihao's avatar
dengyihao 已提交
143
// conn pool
dengyihao's avatar
dengyihao 已提交
144
// add expire timeout and capacity limit
dengyihao's avatar
dengyihao 已提交
145
static void*     createConnPool(int size);
dengyihao's avatar
dengyihao 已提交
146
static void*     destroyConnPool(SCliThrd* thread);
dengyihao's avatar
dengyihao 已提交
147
static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed);
dengyihao's avatar
dengyihao 已提交
148
static void      addConnToPool(void* pool, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
149
static void      doCloseIdleConn(void* param);
dengyihao's avatar
dengyihao 已提交
150

dengyihao's avatar
dengyihao 已提交
151 152
// register conn timer
static void cliConnTimeout(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
153 154
// register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
155
// register timer in each thread to clear expire conn
dengyihao's avatar
dengyihao 已提交
156
// static void cliTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
157
// alloc buffer for recv
dengyihao's avatar
dengyihao 已提交
158
static FORCE_INLINE void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
159
// callback after recv nbytes from socket
U
ubuntu 已提交
160
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
161
// callback after send data to socket
U
ubuntu 已提交
162
static void cliSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
163
// callback after conn to server
U
ubuntu 已提交
164 165
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
166
static void cliIdleCb(uv_idle_t* handle);
dengyihao's avatar
dengyihao 已提交
167
static void cliPrepareCb(uv_prepare_t* handle);
dengyihao's avatar
dengyihao 已提交
168

dengyihao's avatar
dengyihao 已提交
169
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
170
static void cliSendBatchCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
171 172

SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
dengyihao's avatar
dengyihao 已提交
173

dengyihao's avatar
dengyihao 已提交
174 175
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);

dengyihao's avatar
dengyihao 已提交
176
static int32_t allocConnRef(SCliConn* conn, bool update);
dengyihao's avatar
dengyihao 已提交
177

dengyihao's avatar
dengyihao 已提交
178
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
dengyihao's avatar
dengyihao 已提交
179

dengyihao's avatar
dengyihao 已提交
180
static SCliConn* cliCreateConn(SCliThrd* thrd);
U
ubuntu 已提交
181 182
static void      cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void      cliDestroy(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
183
static void      cliSend(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
184
static void      cliSendBatch(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
185
static void      cliDestroyConnMsgs(SCliConn* conn, bool destroy);
dengyihao's avatar
dengyihao 已提交
186

dengyihao's avatar
dengyihao 已提交
187 188
static void    doFreeTimeoutMsg(void* param);
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg);
dengyihao's avatar
dengyihao 已提交
189

dengyihao's avatar
dengyihao 已提交
190
// cli util func
dengyihao's avatar
dengyihao 已提交
191 192
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
dengyihao's avatar
dengyihao 已提交
193

dengyihao's avatar
dengyihao 已提交
194
static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
dengyihao's avatar
dengyihao 已提交
195

dengyihao's avatar
dengyihao 已提交
196 197 198
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn);
static FORCE_INLINE void     cliUpdateFqdnCache(SHashObj* cache, char* fqdn);

dengyihao's avatar
dengyihao 已提交
199
static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst);
dengyihao's avatar
dengyihao 已提交
200
// process data read from server, add decompress etc later
U
ubuntu 已提交
201
static void cliHandleResp(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
202
// handle except about conn
U
ubuntu 已提交
203
static void cliHandleExcept(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
204
static void cliReleaseUnfinishedMsg(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
205
static void cliHandleFastFail(SCliConn* pConn, int status);
dengyihao's avatar
dengyihao 已提交
206

dengyihao's avatar
dengyihao 已提交
207
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd);
208
// handle req from app
dengyihao's avatar
dengyihao 已提交
209 210 211 212 213 214
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
                                                                   cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
215 216
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
/// NULL,cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
217

dengyihao's avatar
dengyihao 已提交
218 219
static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
static FORCE_INLINE void destroyCmsg(void* cmsg);
dengyihao's avatar
dengyihao 已提交
220
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
dengyihao's avatar
dengyihao 已提交
221
static FORCE_INLINE int  cliRBChoseIdx(STrans* pTransInst);
dengyihao's avatar
dengyihao 已提交
222
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
223

dengyihao's avatar
dengyihao 已提交
224
// thread obj
dengyihao's avatar
dengyihao 已提交
225
static SCliThrd* createThrdObj(void* trans);
dengyihao's avatar
dengyihao 已提交
226
static void      destroyThrdObj(SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
227

dengyihao's avatar
dengyihao 已提交
228 229 230 231 232 233 234 235 236
static void cliWalkCb(uv_handle_t* handle, void* arg);

#define CLI_RELEASE_UV(loop)        \
  do {                              \
    uv_walk(loop, cliWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);   \
    uv_loop_close(loop);            \
  } while (0);

dengyihao's avatar
dengyihao 已提交
237
// snprintf may cause performance problem
dengyihao's avatar
dengyihao 已提交
238 239
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
  do {                                         \
dengyihao's avatar
dengyihao 已提交
240 241
    char*   t = key;                           \
    int16_t len = strlen(ip);                  \
dengyihao's avatar
dengyihao 已提交
242
    if (ip != NULL) memcpy(t, ip, len);        \
dengyihao's avatar
dengyihao 已提交
243 244
    t[len] = ':';                              \
    titoa(port, 10, &t[len + 1]);              \
dengyihao's avatar
dengyihao 已提交
245 246
  } while (0)

dengyihao's avatar
dengyihao 已提交
247 248
#define CONN_PERSIST_TIME(para)   ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
dengyihao's avatar
dengyihao 已提交
249

dengyihao's avatar
dengyihao 已提交
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle)                         \
  do {                                                                    \
    int i = 0, sz = transQueueSize(&conn->cliMsgs);                       \
    for (; i < sz; i++) {                                                 \
      pMsg = transQueueGet(&conn->cliMsgs, i);                            \
      if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
        break;                                                            \
      }                                                                   \
    }                                                                     \
    if (i == sz) {                                                        \
      pMsg = NULL;                                                        \
      tDebug("msg not found, %" PRIu64 "", ahandle);                      \
    } else {                                                              \
      pMsg = transQueueRm(&conn->cliMsgs, i);                             \
      tDebug("msg found, %" PRIu64 "", ahandle);                          \
    }                                                                     \
dengyihao's avatar
dengyihao 已提交
266
  } while (0)
dengyihao's avatar
dengyihao 已提交
267

U
ubuntu 已提交
268 269 270 271 272 273 274 275 276 277 278 279 280
#define CONN_GET_NEXT_SENDMSG(conn)                 \
  do {                                              \
    int i = 0;                                      \
    do {                                            \
      pCliMsg = transQueueGet(&conn->cliMsgs, i++); \
      if (pCliMsg && 0 == pCliMsg->sent) {          \
        break;                                      \
      }                                             \
    } while (pCliMsg != NULL);                      \
    if (pCliMsg == NULL) {                          \
      goto _RETURN;                                 \
    }                                               \
  } while (0)
dengyihao's avatar
dengyihao 已提交
281

dengyihao's avatar
formate  
dengyihao 已提交
282 283
#define CONN_SET_PERSIST_BY_APP(conn) \
  do {                                \
dengyihao's avatar
dengyihao 已提交
284 285
    if (conn->status == ConnNormal) { \
      conn->status = ConnAcquire;     \
dengyihao's avatar
formate  
dengyihao 已提交
286 287 288
      transRefCliHandle(conn);        \
    }                                 \
  } while (0)
289

dengyihao's avatar
dengyihao 已提交
290 291 292 293
#define CONN_NO_PERSIST_BY_APP(conn) \
  (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) \
  (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
294

S
Shengliang Guan 已提交
295 296
#define REQUEST_NO_RESP(msg)         ((msg)->info.noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg)   ((msg)->info.persistHandle == 1)
dengyihao's avatar
dengyihao 已提交
297
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
dengyihao's avatar
dengyihao 已提交
298

dengyihao's avatar
dengyihao 已提交
299
#define EPSET_IS_VALID(epSet)       ((epSet) != NULL && (epSet)->numOfEps >= 0 && (epSet)->inUse >= 0)
dengyihao's avatar
dengyihao 已提交
300
#define EPSET_GET_SIZE(epSet)       (epSet)->numOfEps
dengyihao's avatar
dengyihao 已提交
301
#define EPSET_GET_INUSE_IP(epSet)   ((epSet)->eps[(epSet)->inUse].fqdn)
dengyihao's avatar
dengyihao 已提交
302
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
dengyihao's avatar
dengyihao 已提交
303 304 305 306 307 308
#define EPSET_FORWARD_INUSE(epSet)                             \
  do {                                                         \
    if ((epSet)->numOfEps != 0) {                              \
      ++((epSet)->inUse);                                      \
      (epSet)->inUse = ((epSet)->inUse) % ((epSet)->numOfEps); \
    }                                                          \
dengyihao's avatar
dengyihao 已提交
309
  } while (0)
dengyihao's avatar
dengyihao 已提交
310

dengyihao's avatar
dengyihao 已提交
311 312 313 314 315 316 317 318 319 320 321
#define EPSET_DEBUG_STR(epSet, tbuf)                                                                                   \
  do {                                                                                                                 \
    int len = snprintf(tbuf, sizeof(tbuf), "epset:{");                                                                 \
    for (int i = 0; i < (epSet)->numOfEps; i++) {                                                                      \
      if (i == (epSet)->numOfEps - 1) {                                                                                \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port);   \
      } else {                                                                                                         \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
      }                                                                                                                \
    }                                                                                                                  \
    len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse);                                    \
dengyihao's avatar
dengyihao 已提交
322
  } while (0);
dengyihao's avatar
dengyihao 已提交
323

U
ubuntu 已提交
324
static void* cliWorkThread(void* arg);
dengyihao's avatar
dengyihao 已提交
325

dengyihao's avatar
dengyihao 已提交
326 327 328 329 330 331 332 333
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
    if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
      if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
        conn->ctx.freeFunc(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
334
      } else if (msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) {
dengyihao's avatar
dengyihao 已提交
335
        tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
336
        pThrd->destroyAhandleFp(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
337 338 339 340
      }
    }
    destroyCmsg(msg);
  }
dengyihao's avatar
dengyihao 已提交
341
  transQueueClear(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
342
  memset(&conn->ctx, 0, sizeof(conn->ctx));
dengyihao's avatar
dengyihao 已提交
343
}
dengyihao's avatar
dengyihao 已提交
344
bool cliMaySendCachedMsg(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
345
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
346
    SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
347
    CONN_GET_NEXT_SENDMSG(conn);
dengyihao's avatar
dengyihao 已提交
348 349
    cliSend(conn);
    return true;
dengyihao's avatar
dengyihao 已提交
350
  }
dengyihao's avatar
dengyihao 已提交
351
  return false;
U
ubuntu 已提交
352 353
_RETURN:
  return false;
dengyihao's avatar
dengyihao 已提交
354
}
dengyihao's avatar
formate  
dengyihao 已提交
355
void cliHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
356 357
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
358

dengyihao's avatar
dengyihao 已提交
359 360 361 362 363 364
  if (conn->timer) {
    if (uv_is_active((uv_handle_t*)conn->timer)) {
      tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
      uv_timer_stop(conn->timer);
    }
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
365
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
366
    conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
367 368
  }

dengyihao's avatar
opt rpc  
dengyihao 已提交
369
  STransMsgHead* pHead = NULL;
dengyihao's avatar
dengyihao 已提交
370 371 372

  int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
  if (msgLen <= 0) {
dengyihao's avatar
dengyihao 已提交
373
    taosMemoryFree(pHead);
dengyihao's avatar
dengyihao 已提交
374 375 376
    tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
377 378 379 380

  if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
    tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
  }
dengyihao's avatar
dengyihao 已提交
381 382
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
383 384 385 386
  if (cliRecvReleaseReq(conn, pHead)) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
387 388 389 390 391
  STransMsg transMsg = {0};
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = transContFromHead((char*)pHead);
  transMsg.code = pHead->code;
  transMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
392
  transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
393
  transMsg.info.traceId = pHead->traceId;
dengyihao's avatar
dengyihao 已提交
394
  transMsg.info.hasEpSet = pHead->hasEpSet;
dengyihao's avatar
dengyihao 已提交
395
  transMsg.info.cliVer = htonl(pHead->compatibilityVer);
dengyihao's avatar
dengyihao 已提交
396

dengyihao's avatar
dengyihao 已提交
397 398 399 400
  SCliMsg*       pMsg = NULL;
  STransConnCtx* pCtx = NULL;
  if (CONN_NO_PERSIST_BY_APP(conn)) {
    pMsg = transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
401 402 403

    pCtx = pMsg ? pMsg->ctx : NULL;
    transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
404
    tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
U
ubuntu 已提交
405
  } else {
dengyihao's avatar
dengyihao 已提交
406 407 408
    uint64_t ahandle = (uint64_t)pHead->ahandle;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
    if (pMsg == NULL) {
S
Shengliang Guan 已提交
409
      transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
410 411
      tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
             transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
412
      if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
D
dapan1121 已提交
413
        transMsg.code = TSDB_CODE_RPC_BROKEN_LINK;
S
Shengliang Guan 已提交
414
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
dengyihao's avatar
dengyihao 已提交
415 416
        tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
417 418
      }
    } else {
dengyihao's avatar
dengyihao 已提交
419
      pCtx = pMsg->ctx;
S
Shengliang Guan 已提交
420
      transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
421
      tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
422
    }
U
ubuntu 已提交
423
  }
dengyihao's avatar
dengyihao 已提交
424
  // buf's mem alread translated to transMsg.pCont
dengyihao's avatar
dengyihao 已提交
425
  if (!CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
426
    transMsg.info.handle = (void*)conn->refId;
dengyihao's avatar
dengyihao 已提交
427
    tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
428
  }
dengyihao's avatar
dengyihao 已提交
429

dengyihao's avatar
dengyihao 已提交
430
  STraceId* trace = &transMsg.info.traceId;
D
dapan1121 已提交
431
  tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
dengyihao's avatar
dengyihao 已提交
432
          TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
dengyihao's avatar
dengyihao 已提交
433

dengyihao's avatar
dengyihao 已提交
434
  if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
435
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
436
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
437 438
    return;
  }
S
Shengliang Guan 已提交
439
  if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
440
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
441
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
442 443
    return;
  }
dengyihao's avatar
dengyihao 已提交
444

dengyihao's avatar
dengyihao 已提交
445
  if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
446 447 448
    if (cliAppCb(conn, &transMsg, pMsg) != 0) {
      return;
    }
dengyihao's avatar
dengyihao 已提交
449
  }
dengyihao's avatar
dengyihao 已提交
450 451
  destroyCmsg(pMsg);

dengyihao's avatar
dengyihao 已提交
452
  if (cliMaySendCachedMsg(conn) == true) {
dengyihao's avatar
dengyihao 已提交
453 454
    return;
  }
dengyihao's avatar
dengyihao 已提交
455

U
ubuntu 已提交
456
  if (CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
457
    return addConnToPool(pThrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
458
  }
dengyihao's avatar
test  
dengyihao 已提交
459

dengyihao's avatar
dengyihao 已提交
460
  uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
461
}
U
ubuntu 已提交
462

dengyihao's avatar
dengyihao 已提交
463
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
dengyihao's avatar
dengyihao 已提交
464
  if (transQueueEmpty(&pConn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
465
    if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
dengyihao's avatar
dengyihao 已提交
466
      tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
467
      if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn);
dengyihao's avatar
formate  
dengyihao 已提交
468 469 470
      transUnrefCliHandle(pConn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
471
  }
dengyihao's avatar
dengyihao 已提交
472 473 474
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
  bool      once = false;
D
dapan1121 已提交
475
  do {
dengyihao's avatar
dengyihao 已提交
476
    SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
dengyihao's avatar
enh log  
dengyihao 已提交
477

D
dapan1121 已提交
478 479
    if (pMsg == NULL && once) {
      break;
dengyihao's avatar
dengyihao 已提交
480
    }
dengyihao's avatar
enh log  
dengyihao 已提交
481 482 483 484 485 486

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg)) {
      destroyCmsg(pMsg);
      break;
    }

dengyihao's avatar
dengyihao 已提交
487
    STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
U
ubuntu 已提交
488

dengyihao's avatar
dengyihao 已提交
489
    STransMsg transMsg = {0};
dengyihao's avatar
dengyihao 已提交
490
    transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
dengyihao's avatar
dengyihao 已提交
491
    transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
S
Shengliang Guan 已提交
492
    transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
493
    transMsg.info.cliVer = pTransInst->compatibilityVer;
dengyihao's avatar
dengyihao 已提交
494

dengyihao's avatar
dengyihao 已提交
495
    if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
S
Shengliang Guan 已提交
496
      transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
497
      tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
dengyihao's avatar
dengyihao 已提交
498
             TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
499
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
500 501 502
        int32_t msgType = 0;
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType);
        transMsg.msgType = msgType;
dengyihao's avatar
dengyihao 已提交
503
        tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
S
Shengliang Guan 已提交
504
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
505
      }
dengyihao's avatar
dengyihao 已提交
506
    } else {
dengyihao's avatar
dengyihao 已提交
507
      transMsg.info.ahandle = (pMsg != NULL && pMsg->type != Release && pCtx) ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
508 509 510
    }

    if (pCtx == NULL || pCtx->pSem == NULL) {
S
Shengliang Guan 已提交
511
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
512 513 514 515 516
        if (pMsg == NULL || REQUEST_NO_RESP(&pMsg->msg) || pMsg->type == Release) {
          destroyCmsg(pMsg);
          once = true;
          continue;
        }
U
ubuntu 已提交
517
      }
dengyihao's avatar
dengyihao 已提交
518
    }
dengyihao's avatar
enh log  
dengyihao 已提交
519

dengyihao's avatar
dengyihao 已提交
520
    if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
521 522 523
      if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
        return;
      }
dengyihao's avatar
dengyihao 已提交
524 525
    }
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
526
    tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn));
D
dapan1121 已提交
527
  } while (!transQueueEmpty(&pConn->cliMsgs));
dengyihao's avatar
dengyihao 已提交
528
  if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
529
  transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
530
}
dengyihao's avatar
dengyihao 已提交
531
void cliHandleExcept(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
532
  tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
533 534
  cliHandleExceptImpl(conn, -1);
}
dengyihao's avatar
dengyihao 已提交
535

dengyihao's avatar
dengyihao 已提交
536 537
void cliConnTimeout(uv_timer_t* handle) {
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
538 539
  SCliThrd* pThrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
540
  tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
541

dengyihao's avatar
dengyihao 已提交
542
  uv_timer_stop(handle);
dengyihao's avatar
dengyihao 已提交
543 544 545
  handle->data = NULL;
  taosArrayPush(pThrd->timerList, &conn->timer);
  conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
546

dengyihao's avatar
dengyihao 已提交
547
  cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
dengyihao's avatar
dengyihao 已提交
548
  cliHandleFastFail(conn, UV_ECANCELED);
dengyihao's avatar
dengyihao 已提交
549
}
dengyihao's avatar
dengyihao 已提交
550 551 552 553
void cliReadTimeoutCb(uv_timer_t* handle) {
  // set up timeout cb
  SCliConn* conn = handle->data;
  tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
554
  uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
555 556
  cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
}
U
ubuntu 已提交
557 558

void* createConnPool(int size) {
559 560
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
561
}
dengyihao's avatar
dengyihao 已提交
562 563
void* destroyConnPool(SCliThrd* pThrd) {
  void*      pool = pThrd->pool;
dengyihao's avatar
dengyihao 已提交
564
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
565
  while (connList != NULL) {
dengyihao's avatar
dengyihao 已提交
566 567
    while (!QUEUE_IS_EMPTY(&connList->conns)) {
      queue*    h = QUEUE_HEAD(&connList->conns);
dengyihao's avatar
dengyihao 已提交
568
      SCliConn* c = QUEUE_DATA(h, SCliConn, q);
U
ubuntu 已提交
569
      cliDestroyConn(c, true);
dengyihao's avatar
dengyihao 已提交
570
    }
dengyihao's avatar
dengyihao 已提交
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585

    SMsgList* msglist = connList->list;
    while (!QUEUE_IS_EMPTY(&msglist->msgQ)) {
      queue* h = QUEUE_HEAD(&msglist->msgQ);
      QUEUE_REMOVE(h);

      SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);

      transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
      pMsg->ctx->task = NULL;

      doNotifyApp(pMsg, pThrd);
    }
    taosMemoryFree(msglist);

dengyihao's avatar
dengyihao 已提交
586
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
587
  }
dengyihao's avatar
dengyihao 已提交
588
  taosHashCleanup(pool);
589
  return NULL;
dengyihao's avatar
dengyihao 已提交
590 591
}

dengyihao's avatar
dengyihao 已提交
592
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
dengyihao's avatar
dengyihao 已提交
593
  void*      pool = pThrd->pool;
dengyihao's avatar
dengyihao 已提交
594
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
dengyihao's avatar
dengyihao 已提交
595
  STrans*    pTranInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
596
  if (plist == NULL) {
dengyihao's avatar
dengyihao 已提交
597
    SConnList list = {0};
dengyihao's avatar
dengyihao 已提交
598 599
    taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
    plist = taosHashGet(pool, key, strlen(key) + 1);
dengyihao's avatar
dengyihao 已提交
600

dengyihao's avatar
dengyihao 已提交
601 602 603 604
    SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
    QUEUE_INIT(&nList->msgQ);
    nList->numOfConn++;

dengyihao's avatar
dengyihao 已提交
605 606
    QUEUE_INIT(&plist->conns);
    plist->list = nList;
dengyihao's avatar
dengyihao 已提交
607 608
  }

dengyihao's avatar
dengyihao 已提交
609 610 611 612
  if (QUEUE_IS_EMPTY(&plist->conns)) {
    if (plist->list->numOfConn >= pTranInst->connLimitNum) {
      *exceed = true;
    }
dengyihao's avatar
dengyihao 已提交
613 614 615
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
616
  queue* h = QUEUE_TAIL(&plist->conns);
dengyihao's avatar
dengyihao 已提交
617
  QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
618
  plist->size -= 1;
dengyihao's avatar
dengyihao 已提交
619

dengyihao's avatar
dengyihao 已提交
620 621 622 623 624 625 626 627 628 629 630 631 632 633
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
  conn->status = ConnNormal;
  QUEUE_INIT(&conn->q);

  if (conn->task != NULL) {
    transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
    conn->task = NULL;
  }
  return conn;
}

static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
  void*      pool = pThrd->pool;
  STrans*    pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
634
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
dengyihao's avatar
dengyihao 已提交
635
  if (plist == NULL) {
dengyihao's avatar
dengyihao 已提交
636
    SConnList list = {0};
dengyihao's avatar
dengyihao 已提交
637 638
    taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
    plist = taosHashGet(pool, key, strlen(key) + 1);
dengyihao's avatar
dengyihao 已提交
639

dengyihao's avatar
dengyihao 已提交
640 641
    SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
    QUEUE_INIT(&nList->msgQ);
dengyihao's avatar
dengyihao 已提交
642 643
    nList->numOfConn++;

dengyihao's avatar
dengyihao 已提交
644 645
    QUEUE_INIT(&plist->conns);
    plist->list = nList;
dengyihao's avatar
dengyihao 已提交
646 647
  }

dengyihao's avatar
dengyihao 已提交
648
  STraceId* trace = &(*pMsg)->msg.info.traceId;
dengyihao's avatar
dengyihao 已提交
649
  // no avaliable conn in pool
dengyihao's avatar
dengyihao 已提交
650
  if (QUEUE_IS_EMPTY(&plist->conns)) {
dengyihao's avatar
dengyihao 已提交
651
    SMsgList* list = plist->list;
dengyihao's avatar
dengyihao 已提交
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669
    if ((list)->numOfConn >= pTransInst->connLimitNum) {
      STraceId* trace = &(*pMsg)->msg.info.traceId;
      STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
      arg->param1 = *pMsg;
      arg->param2 = pThrd;
      (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);

      tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));

      QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
      *pMsg = NULL;
    } else {
      // send msg in delay queue
      if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
        STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
        arg->param1 = *pMsg;
        arg->param2 = pThrd;
        (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
dengyihao's avatar
dengyihao 已提交
670 671
        tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label,
                TMSG_INFO((*pMsg)->msg.msgType));
dengyihao's avatar
dengyihao 已提交
672 673 674 675 676 677 678

        QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
        queue* h = QUEUE_HEAD(&(list)->msgQ);
        QUEUE_REMOVE(h);
        SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q);

        *pMsg = ans;
dengyihao's avatar
dengyihao 已提交
679 680 681

        trace = &(*pMsg)->msg.info.traceId;
        tGTrace("%s msg %s pop from delay queue, start to send", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
dengyihao's avatar
dengyihao 已提交
682 683 684 685
        transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
      }
      list->numOfConn++;
    }
dengyihao's avatar
dengyihao 已提交
686
    tTrace("%s numOfConn: %d, limit: %d", pTransInst->label, list->numOfConn, pTransInst->connLimitNum);
dengyihao's avatar
dengyihao 已提交
687 688
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
689

dengyihao's avatar
dengyihao 已提交
690
  queue* h = QUEUE_TAIL(&plist->conns);
dengyihao's avatar
dengyihao 已提交
691
  plist->size -= 1;
dengyihao's avatar
dengyihao 已提交
692 693
  QUEUE_REMOVE(h);

dengyihao's avatar
dengyihao 已提交
694
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
dengyihao's avatar
dengyihao 已提交
695
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
696
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
697

dengyihao's avatar
dengyihao 已提交
698 699 700 701
  if (conn->task != NULL) {
    transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
    conn->task = NULL;
  }
dengyihao's avatar
dengyihao 已提交
702
  return conn;
dengyihao's avatar
dengyihao 已提交
703
}
dengyihao's avatar
dengyihao 已提交
704 705 706 707 708 709
static void addConnToPool(void* pool, SCliConn* conn) {
  if (conn->status == ConnInPool) {
    return;
  }
  allocConnRef(conn, true);

dengyihao's avatar
dengyihao 已提交
710
  SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
711 712 713 714 715 716
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    taosArrayPush(thrd->timerList, &conn->timer);
    conn->timer->data = NULL;
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
717 718 719 720 721
  if (T_REF_VAL_GET(conn) > 1) {
    transUnrefCliHandle(conn);
  }

  cliDestroyConnMsgs(conn, false);
dengyihao's avatar
dengyihao 已提交
722

dengyihao's avatar
dengyihao 已提交
723
  if (conn->list == NULL) {
dengyihao's avatar
dengyihao 已提交
724
    conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr) + 1);
dengyihao's avatar
dengyihao 已提交
725
  }
dengyihao's avatar
dengyihao 已提交
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746

  SConnList* pList = conn->list;
  SMsgList*  msgList = pList->list;
  if (!QUEUE_IS_EMPTY(&msgList->msgQ)) {
    queue* h = QUEUE_HEAD(&(msgList)->msgQ);
    QUEUE_REMOVE(h);

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);

    transDQCancel(thrd->waitConnQueue, pMsg->ctx->task);
    pMsg->ctx->task = NULL;

    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
    transQueuePush(&conn->cliMsgs, pMsg);

    conn->status = ConnNormal;
    cliSend(conn);
    return;
  }

  conn->status = ConnInPool;
dengyihao's avatar
dengyihao 已提交
747
  QUEUE_PUSH(&conn->list->conns, &conn->q);
dengyihao's avatar
dengyihao 已提交
748
  conn->list->size += 1;
dengyihao's avatar
dengyihao 已提交
749

dengyihao's avatar
dengyihao 已提交
750
  if (conn->list->size >= 20) {
dengyihao's avatar
dengyihao 已提交
751 752
    STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
    arg->param1 = conn;
dengyihao's avatar
dengyihao 已提交
753
    arg->param2 = thrd;
dengyihao's avatar
dengyihao 已提交
754 755

    STrans* pTransInst = thrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
756 757
    conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
  }
dengyihao's avatar
dengyihao 已提交
758
}
dengyihao's avatar
dengyihao 已提交
759
static int32_t allocConnRef(SCliConn* conn, bool update) {
dengyihao's avatar
dengyihao 已提交
760
  if (update) {
dengyihao's avatar
dengyihao 已提交
761
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
762
    transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
763
    conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
764 765 766 767
  }
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
768
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
dengyihao's avatar
dengyihao 已提交
769
  conn->refId = exh->refId;
770 771 772 773

  if (conn->refId == -1) {
    taosMemoryFree(exh);
  }
dengyihao's avatar
dengyihao 已提交
774 775 776 777 778
  return 0;
}

static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
  if (update) {
dengyihao's avatar
dengyihao 已提交
779
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
780 781 782 783 784 785 786 787 788 789 790 791 792
    transRemoveExHandle(transGetRefMgt(), conn->refId);
    conn->refId = -1;
  }
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
  if (exh == NULL) {
    return -1;
  }
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
  conn->refId = exh->refId;

  transReleaseExHandle(transGetRefMgt(), handle);
  return 0;
dengyihao's avatar
dengyihao 已提交
793
}
dengyihao's avatar
dengyihao 已提交
794

dengyihao's avatar
dengyihao 已提交
795
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
796
  SCliConn*    conn = handle->data;
dengyihao's avatar
dengyihao 已提交
797
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
798
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
799
}
U
ubuntu 已提交
800
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
801
  // impl later
dengyihao's avatar
dengyihao 已提交
802 803 804
  if (handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
805 806
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
807
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
808
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
809
    while (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
810
      tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
811 812 813 814 815 816
      if (pBuf->invalid) {
        cliHandleExcept(conn);
        break;
      } else {
        cliHandleResp(conn);
      }
dengyihao's avatar
dengyihao 已提交
817
    }
dengyihao's avatar
dengyihao 已提交
818 819
    return;
  }
dengyihao's avatar
dengyihao 已提交
820

821
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
822 823 824
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
dengyihao's avatar
dengyihao 已提交
825
    tTrace("%s conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
826 827
    return;
  }
dengyihao's avatar
fix bug  
dengyihao 已提交
828
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
829 830
    tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread),
           T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
831
    conn->broken = true;
U
ubuntu 已提交
832
    cliHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
833
  }
dengyihao's avatar
dengyihao 已提交
834
}
dengyihao's avatar
dengyihao 已提交
835

dengyihao's avatar
dengyihao 已提交
836
static SCliConn* cliCreateConn(SCliThrd* pThrd) {
wafwerar's avatar
wafwerar 已提交
837
  SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
838
  // read/write stream handle
G
gccgdb1234 已提交
839
  conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
840 841 842
  uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
  conn->stream->data = conn;

dengyihao's avatar
dengyihao 已提交
843 844 845 846 847 848 849 850
  uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
  if (timer == NULL) {
    timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    tDebug("no available timer, create a timer %p", timer);
    uv_timer_init(pThrd->loop, timer);
  }
  timer->data = conn;
  conn->timer = timer;
dengyihao's avatar
dengyihao 已提交
851

dengyihao's avatar
dengyihao 已提交
852
  conn->connReq.data = conn;
dengyihao's avatar
dengyihao 已提交
853 854
  transReqQueueInit(&conn->wreqQueue);

dengyihao's avatar
dengyihao 已提交
855
  transQueueInit(&conn->cliMsgs, NULL);
dengyihao's avatar
opt rpc  
dengyihao 已提交
856 857

  transInitBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
858
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
859
  conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
860
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
861
  conn->broken = false;
dengyihao's avatar
dengyihao 已提交
862
  transRefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
863

dengyihao's avatar
dengyihao 已提交
864
  atomic_add_fetch_32(&pThrd->connCount, 1);
dengyihao's avatar
dengyihao 已提交
865
  allocConnRef(conn, false);
dengyihao's avatar
dengyihao 已提交
866

dengyihao's avatar
dengyihao 已提交
867 868
  return conn;
}
U
ubuntu 已提交
869
static void cliDestroyConn(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
870
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
871
  tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
872
  conn->broken = true;
dengyihao's avatar
dengyihao 已提交
873 874
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
875

dengyihao's avatar
dengyihao 已提交
876 877
  conn->broken = true;

dengyihao's avatar
dengyihao 已提交
878 879 880
  if (conn->list != NULL) {
    SConnList* connList = conn->list;
    connList->list->numOfConn--;
dengyihao's avatar
dengyihao 已提交
881
    connList->size--;
dengyihao's avatar
dengyihao 已提交
882
  } else {
dengyihao's avatar
dengyihao 已提交
883
    SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr) + 1);
dengyihao's avatar
dengyihao 已提交
884
    if (connList != NULL) connList->list->numOfConn--;
dengyihao's avatar
dengyihao 已提交
885 886
  }
  conn->list = NULL;
dengyihao's avatar
dengyihao 已提交
887
  pThrd->newConnCount--;
dengyihao's avatar
dengyihao 已提交
888

dengyihao's avatar
dengyihao 已提交
889
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
890
  transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
opt rpc  
dengyihao 已提交
891
  conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
892

dengyihao's avatar
dengyihao 已提交
893 894 895 896 897 898 899
  if (conn->task != NULL) {
    transDQCancel(pThrd->timeoutQueue, conn->task);
    conn->task = NULL;
  }
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
900
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
901 902
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
903

dengyihao's avatar
dengyihao 已提交
904
  if (clear) {
dengyihao's avatar
dengyihao 已提交
905
    if (!uv_is_closing((uv_handle_t*)conn->stream)) {
dengyihao's avatar
dengyihao 已提交
906
      uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
907 908
      uv_close((uv_handle_t*)conn->stream, cliDestroy);
    }
909
  }
dengyihao's avatar
dengyihao 已提交
910
}
U
ubuntu 已提交
911
static void cliDestroy(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
912 913 914
  if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
915
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
916
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
917 918
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
dengyihao's avatar
dengyihao 已提交
919 920
    taosArrayPush(pThrd->timerList, &conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
921 922
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
923

dengyihao's avatar
dengyihao 已提交
924 925
  atomic_sub_fetch_32(&pThrd->connCount, 1);

dengyihao's avatar
dengyihao 已提交
926
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
927
  transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
928
  taosMemoryFree(conn->dstAddr);
wafwerar's avatar
wafwerar 已提交
929
  taosMemoryFree(conn->stream);
dengyihao's avatar
dengyihao 已提交
930 931 932

  cliDestroyConnMsgs(conn, true);

dengyihao's avatar
dengyihao 已提交
933
  tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
934
  transReqQueueClear(&conn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
935
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
936

wafwerar's avatar
wafwerar 已提交
937
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
938
}
dengyihao's avatar
dengyihao 已提交
939
static bool cliHandleNoResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
940 941
  bool res = false;
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
942
    SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0);
dengyihao's avatar
dengyihao 已提交
943
    if (REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
944
      transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
945 946 947 948 949
      destroyCmsg(pMsg);
      res = true;
    }
    if (res == true) {
      if (cliMaySendCachedMsg(conn) == false) {
dengyihao's avatar
dengyihao 已提交
950
        SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
951
        addConnToPool(thrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
952 953 954
        res = false;
      } else {
        res = true;
dengyihao's avatar
dengyihao 已提交
955 956 957 958 959
      }
    }
  }
  return res;
}
U
ubuntu 已提交
960
static void cliSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
961 962
  SCliConn* pConn = transReqQueueRemove(req);
  if (pConn == NULL) return;
dengyihao's avatar
dengyihao 已提交
963

dengyihao's avatar
dengyihao 已提交
964 965 966
  SCliMsg* pMsg = !transQueueEmpty(&pConn->cliMsgs) ? transQueueGet(&pConn->cliMsgs, 0) : NULL;
  if (pMsg != NULL) {
    int64_t cost = taosGetTimestampUs() - pMsg->st;
dengyihao's avatar
dengyihao 已提交
967
    if (cost > 1000 * 20) {
dengyihao's avatar
dengyihao 已提交
968 969 970 971
      tWarn("%s conn %p send cost:%dus, send exception", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
    }
  }

dengyihao's avatar
dengyihao 已提交
972
  if (status == 0) {
D
dapan1121 已提交
973
    tDebug("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
974
  } else {
dengyihao's avatar
dengyihao 已提交
975 976 977 978
    if (!uv_is_closing((uv_handle_t*)&pConn->stream)) {
      tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
      cliHandleExcept(pConn);
    }
dengyihao's avatar
dengyihao 已提交
979 980
    return;
  }
dengyihao's avatar
dengyihao 已提交
981
  if (cliHandleNoResp(pConn) == true) {
dengyihao's avatar
dengyihao 已提交
982
    tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
983 984
    return;
  }
dengyihao's avatar
dengyihao 已提交
985
  uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
986
}
dengyihao's avatar
dengyihao 已提交
987 988 989 990
void cliSendBatch(SCliConn* pConn) {
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

dengyihao's avatar
dengyihao 已提交
991 992
  SCliBatch* pBatch = pConn->pBatch;
  int32_t    wLen = pBatch->wLen;
dengyihao's avatar
dengyihao 已提交
993

dengyihao's avatar
dengyihao 已提交
994
  pBatch->pList->connCnt += 1;
dengyihao's avatar
dengyihao 已提交
995 996 997 998

  uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
  int       i = 0;

dengyihao's avatar
dengyihao 已提交
999 1000
  queue* h = NULL;
  QUEUE_FOREACH(h, &pBatch->wq) {
dengyihao's avatar
dengyihao 已提交
1001 1002 1003 1004 1005 1006 1007 1008 1009
    SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q);

    STransConnCtx* pCtx = pCliMsg->ctx;

    STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
    if (pMsg->pCont == 0) {
      pMsg->pCont = (void*)rpcMallocCont(0);
      pMsg->contLen = 0;
    }
dengyihao's avatar
dengyihao 已提交
1010

dengyihao's avatar
dengyihao 已提交
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
    int            msgLen = transMsgLenFromCont(pMsg->contLen);
    STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);

    if (pHead->comp == 0) {
      pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
      pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
      pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
      pHead->msgType = pMsg->msgType;
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
      memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
      pHead->traceId = pMsg->info.traceId;
      pHead->magicNum = htonl(TRANS_MAGIC_NUM);
dengyihao's avatar
dengyihao 已提交
1024 1025
      pHead->version = TRANS_VER;
      pHead->compatibilityVer = htonl(pTransInst->compatibilityVer);
dengyihao's avatar
dengyihao 已提交
1026 1027 1028
    }
    pHead->timestamp = taosHton64(taosGetTimestampUs());

dengyihao's avatar
dengyihao 已提交
1029 1030 1031 1032 1033 1034 1035 1036
    if (pHead->comp == 0) {
      if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
        msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
        pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      }
    } else {
      msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
    }
dengyihao's avatar
dengyihao 已提交
1037 1038 1039 1040 1041
    wb[i++] = uv_buf_init((char*)pHead, msgLen);
  }

  uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
  req->data = pConn;
dengyihao's avatar
dengyihao 已提交
1042
  tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn,
dengyihao's avatar
dengyihao 已提交
1043
         pBatch->wLen, pBatch->batchSize);
dengyihao's avatar
dengyihao 已提交
1044 1045 1046
  uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
  taosMemoryFree(wb);
}
U
ubuntu 已提交
1047
void cliSend(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
1048 1049 1050 1051 1052
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  if (transQueueEmpty(&pConn->cliMsgs)) {
    tError("%s conn %p not msg to send", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
1053
    cliHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
1054 1055
    return;
  }
dengyihao's avatar
dengyihao 已提交
1056 1057

  SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
1058
  CONN_GET_NEXT_SENDMSG(pConn);
dengyihao's avatar
dengyihao 已提交
1059 1060
  pCliMsg->sent = 1;

dengyihao's avatar
dengyihao 已提交
1061
  STransConnCtx* pCtx = pCliMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1062

U
ubuntu 已提交
1063
  STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
dengyihao's avatar
dengyihao 已提交
1064 1065 1066 1067
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
dengyihao's avatar
dengyihao 已提交
1068

dengyihao's avatar
dengyihao 已提交
1069
  int            msgLen = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
1070
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
1071

dengyihao's avatar
dengyihao 已提交
1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
  if (pHead->comp == 0) {
    pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
    pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
    pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
    pHead->msgType = pMsg->msgType;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
    memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
    pHead->traceId = pMsg->info.traceId;
    pHead->magicNum = htonl(TRANS_MAGIC_NUM);
dengyihao's avatar
dengyihao 已提交
1082 1083
    pHead->version = TRANS_VER;
    pHead->compatibilityVer = htonl(pTransInst->compatibilityVer);
dengyihao's avatar
dengyihao 已提交
1084
  }
dengyihao's avatar
dengyihao 已提交
1085
  pHead->timestamp = taosHton64(taosGetTimestampUs());
dengyihao's avatar
dengyihao 已提交
1086

dengyihao's avatar
dengyihao 已提交
1087 1088 1089
  if (pHead->persist == 1) {
    CONN_SET_PERSIST_BY_APP(pConn);
  }
dengyihao's avatar
dengyihao 已提交
1090

dengyihao's avatar
dengyihao 已提交
1091
  STraceId* trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
1092

dengyihao's avatar
dengyihao 已提交
1093
  if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
dengyihao's avatar
dengyihao 已提交
1094
    uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
dengyihao's avatar
dengyihao 已提交
1095 1096
    if (timer == NULL) {
      timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
dengyihao's avatar
dengyihao 已提交
1097
      tDebug("no available timer, create a timer %p", timer);
dengyihao's avatar
dengyihao 已提交
1098 1099 1100 1101 1102
      uv_timer_init(pThrd->loop, timer);
    }
    timer->data = pConn;
    pConn->timer = timer;

dengyihao's avatar
dengyihao 已提交
1103 1104 1105
    tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
    uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
  }
dengyihao's avatar
dengyihao 已提交
1106

dengyihao's avatar
dengyihao 已提交
1107 1108 1109 1110 1111 1112 1113
  if (pHead->comp == 0) {
    if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
      msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    }
  } else {
    msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
dengyihao's avatar
dengyihao 已提交
1114
  }
dengyihao's avatar
dengyihao 已提交
1115

D
dapan1121 已提交
1116
  tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
dengyihao's avatar
dengyihao 已提交
1117
          TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
dengyihao's avatar
dengyihao 已提交
1118

dengyihao's avatar
dengyihao 已提交
1119
  uv_buf_t    wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
1120
  uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
1121 1122 1123

  int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1124
    tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
dengyihao's avatar
dengyihao 已提交
1125 1126 1127
            uv_err_name(status));
    cliHandleExcept(pConn);
  }
U
ubuntu 已提交
1128 1129
  return;
_RETURN:
dengyihao's avatar
dengyihao 已提交
1130
  return;
dengyihao's avatar
dengyihao 已提交
1131
}
dengyihao's avatar
dengyihao 已提交
1132 1133

static void cliDestroyBatch(SCliBatch* pBatch) {
dengyihao's avatar
dengyihao 已提交
1134
  if (pBatch == NULL) return;
dengyihao's avatar
dengyihao 已提交
1135
  while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
1136 1137
    queue* h = QUEUE_HEAD(&pBatch->wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
1138

dengyihao's avatar
dengyihao 已提交
1139
    SCliMsg* p = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1140 1141
    destroyCmsg(p);
  }
dengyihao's avatar
dengyihao 已提交
1142 1143
  SCliBatchList* p = pBatch->pList;
  p->sending -= 1;
dengyihao's avatar
dengyihao 已提交
1144 1145
  taosMemoryFree(pBatch);
}
dengyihao's avatar
dengyihao 已提交
1146
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1147 1148 1149 1150 1151
  if (pThrd->quit == true) {
    cliDestroyBatch(pBatch);
    return;
  }

dengyihao's avatar
dengyihao 已提交
1152
  if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
1153 1154
    return;
  }
dengyihao's avatar
dengyihao 已提交
1155 1156
  STrans*        pTransInst = pThrd->pTransInst;
  SCliBatchList* pList = pBatch->pList;
dengyihao's avatar
dengyihao 已提交
1157

dengyihao's avatar
dengyihao 已提交
1158 1159 1160
  char key[TSDB_FQDN_LEN + 64] = {0};
  CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port);

dengyihao's avatar
dengyihao 已提交
1161 1162
  bool      exceed = false;
  SCliConn* conn = getConnFromPool(pThrd, key, &exceed);
dengyihao's avatar
dengyihao 已提交
1163

dengyihao's avatar
dengyihao 已提交
1164 1165 1166
  if (conn == NULL && exceed) {
    tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen,
           pBatch->batchSize, pTransInst->connLimitNum);
dengyihao's avatar
dengyihao 已提交
1167
    cliDestroyBatch(pBatch);
dengyihao's avatar
dengyihao 已提交
1168 1169
    return;
  }
dengyihao's avatar
dengyihao 已提交
1170 1171
  if (conn == NULL) {
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
1172
    conn->pBatch = pBatch;
dengyihao's avatar
dengyihao 已提交
1173
    conn->dstAddr = taosStrdup(pList->dst);
dengyihao's avatar
dengyihao 已提交
1174

dengyihao's avatar
dengyihao 已提交
1175
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip);
dengyihao's avatar
dengyihao 已提交
1176 1177 1178 1179 1180 1181
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1182
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1183 1184 1185 1186 1187
      return;
    }
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1188
    addr.sin_port = (uint16_t)htons(pList->port);
dengyihao's avatar
dengyihao 已提交
1189

dengyihao's avatar
dengyihao 已提交
1190
    tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst);
dengyihao's avatar
dengyihao 已提交
1191
    pThrd->newConnCount++;
dengyihao's avatar
dengyihao 已提交
1192
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
dengyihao's avatar
dengyihao 已提交
1193
    if (fd == -1) {
dengyihao's avatar
dengyihao 已提交
1194 1195 1196
      tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
             tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1197 1198 1199 1200
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1201 1202
      tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1203 1204
      return;
    }
dengyihao's avatar
dengyihao 已提交
1205
    ret = transSetConnOption((uv_tcp_t*)conn->stream, 20);
dengyihao's avatar
dengyihao 已提交
1206
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1207 1208
      tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1209 1210 1211 1212 1213 1214 1215 1216 1217
      return;
    }

    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
    if (ret != 0) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
1218 1219

      cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
dengyihao's avatar
dengyihao 已提交
1220
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1221 1222 1223 1224 1225 1226
      return;
    }
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
    return;
  }

dengyihao's avatar
dengyihao 已提交
1227
  conn->pBatch = pBatch;
dengyihao's avatar
dengyihao 已提交
1228
  cliSendBatch(conn);
dengyihao's avatar
dengyihao 已提交
1229 1230
}
static void cliSendBatchCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
1231 1232 1233
  SCliConn*  conn = req->data;
  SCliThrd*  thrd = conn->hostThrd;
  SCliBatch* p = conn->pBatch;
dengyihao's avatar
dengyihao 已提交
1234

dengyihao's avatar
dengyihao 已提交
1235
  SCliBatchList* pBatchList = p->pList;
dengyihao's avatar
dengyihao 已提交
1236
  SCliBatch*     nxtBatch = cliGetHeadFromList(pBatchList);
dengyihao's avatar
dengyihao 已提交
1237 1238
  pBatchList->connCnt -= 1;

dengyihao's avatar
dengyihao 已提交
1239 1240 1241
  conn->pBatch = NULL;

  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1242
    tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
dengyihao's avatar
dengyihao 已提交
1243
           p->wLen, p->batchSize, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
1244 1245 1246

    if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn);

dengyihao's avatar
dengyihao 已提交
1247
    cliHandleBatchReq(nxtBatch, thrd);
dengyihao's avatar
dengyihao 已提交
1248
  } else {
dengyihao's avatar
dengyihao 已提交
1249
    tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
dengyihao's avatar
dengyihao 已提交
1250
           p->batchSize);
dengyihao's avatar
dengyihao 已提交
1251
    if (!uv_is_closing((uv_handle_t*)&conn->stream) && conn->broken == false) {
dengyihao's avatar
dengyihao 已提交
1252 1253 1254 1255 1256 1257
      if (nxtBatch != NULL) {
        conn->pBatch = nxtBatch;
        cliSendBatch(conn);
      } else {
        addConnToPool(thrd->pool, conn);
      }
dengyihao's avatar
dengyihao 已提交
1258
    } else {
dengyihao's avatar
dengyihao 已提交
1259 1260
      cliDestroyBatch(nxtBatch);
      // conn release by other callback
dengyihao's avatar
dengyihao 已提交
1261
    }
dengyihao's avatar
dengyihao 已提交
1262
  }
dengyihao's avatar
dengyihao 已提交
1263 1264 1265

  cliDestroyBatch(p);
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
1266
}
dengyihao's avatar
dengyihao 已提交
1267
static void cliHandleFastFail(SCliConn* pConn, int status) {
dengyihao's avatar
dengyihao 已提交
1268
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1269
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1270 1271 1272

  if (status == -1) status = ENETUNREACH;

dengyihao's avatar
dengyihao 已提交
1273 1274 1275 1276 1277
  if (pConn->pBatch == NULL) {
    SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);

    STraceId* trace = &pMsg->msg.info.traceId;
    tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
dengyihao's avatar
dengyihao 已提交
1278
            TMSG_INFO(pMsg->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
1279 1280 1281

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
        (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
dengyihao's avatar
dengyihao 已提交
1282
      SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1);
dengyihao's avatar
dengyihao 已提交
1283 1284 1285 1286 1287 1288 1289 1290 1291
      int64_t        cTimestamp = taosGetTimestampMs();
      if (item != NULL) {
        int32_t elapse = cTimestamp - item->timestamp;
        if (elapse >= 0 && elapse <= pTransInst->failFastInterval) {
          item->count++;
        } else {
          item->count = 1;
          item->timestamp = cTimestamp;
        }
dengyihao's avatar
dengyihao 已提交
1292
      } else {
dengyihao's avatar
dengyihao 已提交
1293
        SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
dengyihao's avatar
dengyihao 已提交
1294
        taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1, &item, sizeof(SFailFastItem));
dengyihao's avatar
dengyihao 已提交
1295 1296
      }
    }
dengyihao's avatar
dengyihao 已提交
1297
  } else {
dengyihao's avatar
dengyihao 已提交
1298
    tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
dengyihao's avatar
dengyihao 已提交
1299
           pConn, pConn->dstAddr, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
1300 1301
    cliDestroyBatch(pConn->pBatch);
    pConn->pBatch = NULL;
dengyihao's avatar
dengyihao 已提交
1302 1303 1304
  }
  cliHandleExcept(pConn);
}
dengyihao's avatar
dengyihao 已提交
1305

dengyihao's avatar
dengyihao 已提交
1306 1307 1308
void cliConnCb(uv_connect_t* req, int status) {
  SCliConn* pConn = req->data;
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1309 1310 1311 1312 1313 1314 1315 1316 1317 1318
  bool      timeout = false;

  if (pConn->timer == NULL) {
    timeout = true;
  } else {
    uv_timer_stop(pConn->timer);
    pConn->timer->data = NULL;
    taosArrayPush(pThrd->timerList, &pConn->timer);
    pConn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
1319 1320

  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1321
    cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr);
dengyihao's avatar
dengyihao 已提交
1322 1323 1324 1325 1326
    if (timeout == false) {
      cliHandleFastFail(pConn, status);
    } else if (timeout == true) {
      // already deal by timeout
    }
1327
    return;
dengyihao's avatar
dengyihao 已提交
1328
  }
dengyihao's avatar
dengyihao 已提交
1329

dengyihao's avatar
dengyihao 已提交
1330 1331
  struct sockaddr peername, sockname;
  int             addrlen = sizeof(peername);
dengyihao's avatar
dengyihao 已提交
1332
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
dengyihao's avatar
dengyihao 已提交
1333
  transSockInfo2Str(&peername, pConn->dst);
dengyihao's avatar
dengyihao 已提交
1334

dengyihao's avatar
dengyihao 已提交
1335 1336
  addrlen = sizeof(sockname);
  uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
dengyihao's avatar
dengyihao 已提交
1337
  transSockInfo2Str(&sockname, pConn->src);
dengyihao's avatar
dengyihao 已提交
1338

dengyihao's avatar
dengyihao 已提交
1339
  tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
1340 1341 1342 1343 1344
  if (pConn->pBatch != NULL) {
    cliSendBatch(pConn);
  } else {
    cliSend(pConn);
  }
dengyihao's avatar
dengyihao 已提交
1345 1346
}

dengyihao's avatar
dengyihao 已提交
1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
  STransConnCtx* pCtx = pMsg->ctx;
  STrans*        pTransInst = pThrd->pTransInst;

  STransMsg transMsg = {0};
  transMsg.contLen = 0;
  transMsg.pCont = NULL;
  transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS;
  transMsg.msgType = pMsg->msg.msgType + 1;
  transMsg.info.ahandle = pMsg->ctx->ahandle;
  transMsg.info.traceId = pMsg->msg.info.traceId;
  transMsg.info.hasEpSet = false;
dengyihao's avatar
dengyihao 已提交
1359
  transMsg.info.cliVer = pTransInst->compatibilityVer;
dengyihao's avatar
dengyihao 已提交
1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370
  if (pCtx->pSem != NULL) {
    if (pCtx->pRsp == NULL) {
    } else {
      memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
    }
  } else {
    pTransInst->cfp(pTransInst->parent, &transMsg, NULL);
  }

  destroyCmsg(pMsg);
}
dengyihao's avatar
dengyihao 已提交
1371
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1372 1373 1374 1375 1376
  if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) {
    pThrd->stopMsg = pMsg;
    return;
  }
  pThrd->stopMsg = NULL;
dengyihao's avatar
dengyihao 已提交
1377
  pThrd->quit = true;
U
ubuntu 已提交
1378
  tDebug("cli work thread %p start to quit", pThrd);
dengyihao's avatar
dengyihao 已提交
1379
  destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1380

dengyihao's avatar
dengyihao 已提交
1381
  destroyConnPool(pThrd);
dengyihao's avatar
dengyihao 已提交
1382
  uv_walk(pThrd->loop, cliWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
1383
}
dengyihao's avatar
dengyihao 已提交
1384
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1385
  int64_t    refId = (int64_t)(pMsg->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1386
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1387
  if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1388
    tDebug("%" PRId64 " already released", refId);
dengyihao's avatar
dengyihao 已提交
1389 1390
    destroyCmsg(pMsg);
    return;
dengyihao's avatar
dengyihao 已提交
1391 1392 1393
  }

  SCliConn* conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1394
  transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1395
  tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
1396

dengyihao's avatar
dengyihao 已提交
1397 1398
  if (T_REF_VAL_GET(conn) == 2) {
    transUnrefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
1399 1400
    if (!transQueuePush(&conn->cliMsgs, pMsg)) {
      return;
dengyihao's avatar
dengyihao 已提交
1401 1402
    }
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1403 1404 1405
  } else {
    tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1406 1407
  }
}
dengyihao's avatar
dengyihao 已提交
1408
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1409
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1410
  pThrd->cvtAddr = pCtx->cvtAddr;
dengyihao's avatar
dengyihao 已提交
1411 1412
  destroyCmsg(pMsg);
}
U
ubuntu 已提交
1413

dengyihao's avatar
dengyihao 已提交
1414 1415
SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
  STransConnCtx* pCtx = (*pMsg)->ctx;
dengyihao's avatar
dengyihao 已提交
1416 1417
  SCliConn*      conn = NULL;

dengyihao's avatar
dengyihao 已提交
1418
  int64_t refId = (int64_t)((*pMsg)->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1419
  if (refId != 0) {
dengyihao's avatar
dengyihao 已提交
1420
    SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1421
    if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1422
      tError("failed to get conn, refId: %" PRId64 "", refId);
dengyihao's avatar
dengyihao 已提交
1423 1424
      *ignore = true;
      return NULL;
dengyihao's avatar
dengyihao 已提交
1425 1426
    } else {
      conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1427
      if (conn == NULL) {
dengyihao's avatar
dengyihao 已提交
1428
        conn = getConnFromPool2(pThrd, addr, pMsg);
dengyihao's avatar
dengyihao 已提交
1429
        if (conn != NULL) specifyConnRef(conn, true, refId);
dengyihao's avatar
dengyihao 已提交
1430
      }
dengyihao's avatar
dengyihao 已提交
1431
      transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1432 1433 1434
    }
    return conn;
  };
dengyihao's avatar
dengyihao 已提交
1435

dengyihao's avatar
dengyihao 已提交
1436
  conn = getConnFromPool2(pThrd, addr, pMsg);
dengyihao's avatar
dengyihao 已提交
1437
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1438
    tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1439
  } else {
dengyihao's avatar
dengyihao 已提交
1440
    tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1441
  }
dengyihao's avatar
dengyihao 已提交
1442 1443
  return conn;
}
dengyihao's avatar
dengyihao 已提交
1444
FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
dengyihao's avatar
dengyihao 已提交
1445 1446 1447
  if (pCvtAddr->cvt == false) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1448 1449 1450
  if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
    memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN);
    memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
dengyihao's avatar
dengyihao 已提交
1451 1452
  }
}
dengyihao's avatar
dengyihao 已提交
1453

dengyihao's avatar
dengyihao 已提交
1454
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
dengyihao's avatar
dengyihao 已提交
1455
  if (code != 0) return false;
dengyihao's avatar
dengyihao 已提交
1456
  // if (pCtx->retryCnt == 0) return false;
dengyihao's avatar
dengyihao 已提交
1457 1458 1459
  if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
  return true;
}
dengyihao's avatar
dengyihao 已提交
1460
FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
dengyihao's avatar
dengyihao 已提交
1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471
  if (pMsg == NULL) return -1;

  memset(pResp, 0, sizeof(STransMsg));

  pResp->code = TSDB_CODE_RPC_BROKEN_LINK;
  pResp->msgType = pMsg->msg.msgType + 1;
  pResp->info.ahandle = pMsg->ctx ? pMsg->ctx->ahandle : NULL;
  pResp->info.traceId = pMsg->msg.info.traceId;

  return 0;
}
dengyihao's avatar
dengyihao 已提交
1472 1473
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
  uint32_t  addr = 0;
dengyihao's avatar
dengyihao 已提交
1474
  uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1);
dengyihao's avatar
dengyihao 已提交
1475 1476
  if (v == NULL) {
    addr = taosGetIpv4FromFqdn(fqdn);
1477 1478 1479 1480
    if (addr == 0xffffffff) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr());
      return addr;
dengyihao's avatar
dengyihao 已提交
1481 1482
    }

dengyihao's avatar
dengyihao 已提交
1483
    taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
dengyihao's avatar
dengyihao 已提交
1484 1485 1486 1487 1488 1489 1490
  } else {
    addr = *v;
  }
  return addr;
}
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
  // impl later
dengyihao's avatar
dengyihao 已提交
1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501
  uint32_t addr = taosGetIpv4FromFqdn(fqdn);
  if (addr != 0xffffffff) {
    uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1);
    if (addr != *v) {
      char old[64] = {0}, new[64] = {0};
      tinet_ntoa(old, *v);
      tinet_ntoa(new, addr);
      tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new);
      taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
    }
  }
dengyihao's avatar
dengyihao 已提交
1502 1503
  return;
}
dengyihao's avatar
dengyihao 已提交
1504

dengyihao's avatar
dengyihao 已提交
1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518
static void cliMayUpdateFqdnCache(SHashObj* cache, char* dst) {
  if (dst == NULL) return;

  int16_t i = 0, len = strlen(dst);
  for (i = len - 1; i >= 0; i--) {
    if (dst[i] == ':') break;
  }
  if (i > 0) {
    char fqdn[TSDB_FQDN_LEN + 1] = {0};
    memcpy(fqdn, dst, i);
    cliUpdateFqdnCache(cache, fqdn);
  }
}

dengyihao's avatar
dengyihao 已提交
1519 1520 1521 1522 1523
static void doFreeTimeoutMsg(void* param) {
  STaskArg* arg = param;
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1524

dengyihao's avatar
dengyihao 已提交
1525 1526 1527 1528 1529 1530
  QUEUE_REMOVE(&pMsg->q);
  STraceId* trace = &pMsg->msg.info.traceId;
  tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
  doNotifyApp(pMsg, pThrd);
  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
1531
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1532
  STrans* pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1533

dengyihao's avatar
dengyihao 已提交
1534 1535
  cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
  if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
dengyihao's avatar
dengyihao 已提交
1536
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1537 1538
    return;
  }
dengyihao's avatar
dengyihao 已提交
1539

dengyihao's avatar
dengyihao 已提交
1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
  if (rpcDebugFlag & DEBUG_TRACE) {
    if (tmsgIsValid(pMsg->msg.msgType)) {
      char buf[128] = {0};
      sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType));
      int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
      if (NULL == 0) {
        int localCount = 1;
        taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
      } else {
        int localCount = *count + 1;
        taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
      }
dengyihao's avatar
dengyihao 已提交
1552 1553
    }
  }
dengyihao's avatar
dengyihao 已提交
1554

dengyihao's avatar
dengyihao 已提交
1555 1556
  char*    fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
  uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
dengyihao's avatar
dengyihao 已提交
1557 1558
  char     addr[TSDB_FQDN_LEN + 64] = {0};
  CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port);
dengyihao's avatar
dengyihao 已提交
1559

dengyihao's avatar
dengyihao 已提交
1560
  bool      ignore = false;
dengyihao's avatar
dengyihao 已提交
1561
  SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr);
dengyihao's avatar
dengyihao 已提交
1562
  if (ignore == true) {
dengyihao's avatar
dengyihao 已提交
1563
    // persist conn already release by server
dengyihao's avatar
dengyihao 已提交
1564 1565
    STransMsg resp;
    cliBuildExceptResp(pMsg, &resp);
dengyihao's avatar
dengyihao 已提交
1566 1567 1568
    // refactorr later
    resp.info.cliVer = pTransInst->compatibilityVer;

dengyihao's avatar
dengyihao 已提交
1569 1570 1571
    if (pMsg->type != Release) {
      pTransInst->cfp(pTransInst->parent, &resp, NULL);
    }
dengyihao's avatar
dengyihao 已提交
1572
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1573 1574
    return;
  }
dengyihao's avatar
dengyihao 已提交
1575
  if (conn == NULL && pMsg == NULL) {
dengyihao's avatar
dengyihao 已提交
1576 1577
    return;
  }
dengyihao's avatar
dengyihao 已提交
1578
  STraceId* trace = &pMsg->msg.info.traceId;
dengyihao's avatar
dengyihao 已提交
1579

dengyihao's avatar
dengyihao 已提交
1580
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1581
    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
dengyihao's avatar
dengyihao 已提交
1582
    transQueuePush(&conn->cliMsgs, pMsg);
U
ubuntu 已提交
1583
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1584
  } else {
U
ubuntu 已提交
1585
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
1586 1587 1588 1589

    int64_t refId = (int64_t)pMsg->msg.info.handle;
    if (refId != 0) specifyConnRef(conn, true, refId);

dengyihao's avatar
dengyihao 已提交
1590
    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
dengyihao's avatar
dengyihao 已提交
1591
    transQueuePush(&conn->cliMsgs, pMsg);
dengyihao's avatar
dengyihao 已提交
1592

dengyihao's avatar
dengyihao 已提交
1593
    conn->dstAddr = taosStrdup(addr);
dengyihao's avatar
dengyihao 已提交
1594

dengyihao's avatar
dengyihao 已提交
1595
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
1596 1597 1598 1599 1600 1601 1602 1603 1604
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1605

dengyihao's avatar
dengyihao 已提交
1606
    struct sockaddr_in addr;
1607
    addr.sin_family = AF_INET;
1608
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1609
    addr.sin_port = (uint16_t)htons(port);
dengyihao's avatar
dengyihao 已提交
1610

dengyihao's avatar
dengyihao 已提交
1611
    tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr);
dengyihao's avatar
dengyihao 已提交
1612
    pThrd->newConnCount++;
dengyihao's avatar
dengyihao 已提交
1613
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626
    if (fd == -1) {
      tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
              tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleExcept(conn);
      errno = 0;
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
      tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1627
    ret = transSetConnOption((uv_tcp_t*)conn->stream, tsKeepAliveIdle);
1628 1629 1630 1631 1632
    if (ret != 0) {
      tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1633

1634
    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
dengyihao's avatar
dengyihao 已提交
1635
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1636 1637 1638 1639 1640
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1641
      cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
dengyihao's avatar
dengyihao 已提交
1642
      cliHandleFastFail(conn, ret);
dengyihao's avatar
dengyihao 已提交
1643 1644
      return;
    }
dengyihao's avatar
dengyihao 已提交
1645
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
dengyihao's avatar
dengyihao 已提交
1646
  }
dengyihao's avatar
dengyihao 已提交
1647
  tGTrace("%s conn %p ready", pTransInst->label, conn);
dengyihao's avatar
dengyihao 已提交
1648
}
dengyihao's avatar
dengyihao 已提交
1649

dengyihao's avatar
dengyihao 已提交
1650
static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1651
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1652

dengyihao's avatar
dengyihao 已提交
1653 1654
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1655 1656 1657
    QUEUE_REMOVE(h);

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1658 1659 1660 1661 1662

    if (pMsg->type == Quit) {
      pThrd->stopMsg = pMsg;
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1663 1664 1665 1666 1667 1668 1669 1670
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);

    count++;
  }
  if (count >= 2) {
    tTrace("cli process batch size:%d", count);
  }
}
dengyihao's avatar
dengyihao 已提交
1671
SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
dengyihao's avatar
dengyihao 已提交
1672
  if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) {
dengyihao's avatar
dengyihao 已提交
1673 1674 1675 1676
    return NULL;
  }
  queue* hr = QUEUE_HEAD(&pList->wq);
  QUEUE_REMOVE(hr);
dengyihao's avatar
dengyihao 已提交
1677
  pList->sending += 1;
dengyihao's avatar
dengyihao 已提交
1678 1679 1680 1681 1682 1683

  pList->len -= 1;

  SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
  return batch;
}
dengyihao's avatar
dengyihao 已提交
1684

dengyihao's avatar
dengyihao 已提交
1685
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1686 1687
  STrans* pInst = pThrd->pTransInst;

dengyihao's avatar
dengyihao 已提交
1688
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1689 1690
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1691
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
1692 1693

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1694 1695 1696 1697 1698 1699

    if (pMsg->type == Quit) {
      pThrd->stopMsg = pMsg;
      continue;
    }

dengyihao's avatar
dengyihao 已提交
1700
    if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
1701 1702 1703 1704 1705 1706 1707
      STransConnCtx* pCtx = pMsg->ctx;

      char*    ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
      uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
      char     key[TSDB_FQDN_LEN + 64] = {0};
      CONN_CONSTRUCT_HASH_KEY(key, ip, port);

dengyihao's avatar
dengyihao 已提交
1708 1709 1710 1711 1712
      // SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key));
      SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key));
      if (ppBatchList == NULL || *ppBatchList == NULL) {
        SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
        QUEUE_INIT(&pBatchList->wq);
dengyihao's avatar
dengyihao 已提交
1713
        pBatchList->connMax = pInst->connLimitNum;
dengyihao's avatar
dengyihao 已提交
1714
        pBatchList->connCnt = 0;
dengyihao's avatar
dengyihao 已提交
1715
        pBatchList->batchLenLimit = pInst->batchSize;
dengyihao's avatar
dengyihao 已提交
1716
        pBatchList->len += 1;
dengyihao's avatar
dengyihao 已提交
1717

1718 1719
        pBatchList->ip = taosStrdup(ip);
        pBatchList->dst = taosStrdup(key);
dengyihao's avatar
dengyihao 已提交
1720 1721
        pBatchList->port = port;

dengyihao's avatar
dengyihao 已提交
1722 1723
        SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
        QUEUE_INIT(&pBatch->wq);
dengyihao's avatar
dengyihao 已提交
1724 1725
        QUEUE_INIT(&pBatch->listq);

dengyihao's avatar
dengyihao 已提交
1726 1727 1728
        QUEUE_PUSH(&pBatch->wq, h);
        pBatch->wLen += 1;
        pBatch->batchSize += pMsg->msg.contLen;
dengyihao's avatar
dengyihao 已提交
1729
        pBatch->pList = pBatchList;
dengyihao's avatar
dengyihao 已提交
1730

dengyihao's avatar
dengyihao 已提交
1731
        QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
dengyihao's avatar
dengyihao 已提交
1732

dengyihao's avatar
dengyihao 已提交
1733
        taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
1734
      } else {
dengyihao's avatar
dengyihao 已提交
1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755
        if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
          SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
          QUEUE_INIT(&pBatch->wq);
          QUEUE_INIT(&pBatch->listq);

          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->wLen += 1;
          pBatch->batchSize = pMsg->msg.contLen;
          pBatch->pList = *ppBatchList;

          QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
          (*ppBatchList)->len += 1;

          continue;
        }

        queue*     hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
        SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
        if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) {
          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->batchSize += pMsg->msg.contLen;
dengyihao's avatar
dengyihao 已提交
1756
          pBatch->wLen += 1;
dengyihao's avatar
dengyihao 已提交
1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769
        } else {
          SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
          QUEUE_INIT(&pBatch->wq);
          QUEUE_INIT(&pBatch->listq);

          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->wLen += 1;
          pBatch->batchSize += pMsg->msg.contLen;
          pBatch->pList = *ppBatchList;

          QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
          (*ppBatchList)->len += 1;
        }
dengyihao's avatar
dengyihao 已提交
1770
      }
dengyihao's avatar
dengyihao 已提交
1771
      continue;
dengyihao's avatar
dengyihao 已提交
1772
    }
dengyihao's avatar
dengyihao 已提交
1773
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1774
    count++;
dengyihao's avatar
dengyihao 已提交
1775
  }
dengyihao's avatar
dengyihao 已提交
1776

dengyihao's avatar
dengyihao 已提交
1777
  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
dengyihao's avatar
dengyihao 已提交
1778
  while (pIter != NULL) {
dengyihao's avatar
dengyihao 已提交
1779
    SCliBatchList* batchList = (SCliBatchList*)(*pIter);
dengyihao's avatar
dengyihao 已提交
1780 1781 1782
    SCliBatch*     batch = cliGetHeadFromList(batchList);
    if (batch != NULL) {
      cliHandleBatchReq(batch, pThrd);
dengyihao's avatar
dengyihao 已提交
1783
    }
dengyihao's avatar
dengyihao 已提交
1784
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
dengyihao's avatar
dengyihao 已提交
1785 1786
  }

dengyihao's avatar
dengyihao 已提交
1787
  if (count >= 2) {
S
Shengliang Guan 已提交
1788
    tTrace("cli process batch size:%d", count);
dengyihao's avatar
dengyihao 已提交
1789
  }
dengyihao's avatar
dengyihao 已提交
1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802
}

static void cliAsyncCb(uv_async_t* handle) {
  SAsyncItem* item = handle->data;
  SCliThrd*   pThrd = item->pThrd;
  STrans*     pTransInst = pThrd->pTransInst;

  // batch process to avoid to lock/unlock frequently
  queue wq;
  taosThreadMutexLock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  taosThreadMutexUnlock(&item->mtx);

dengyihao's avatar
dengyihao 已提交
1803 1804 1805 1806 1807 1808 1809 1810 1811
  if (rpcDebugFlag & DEBUG_TRACE) {
    void* pIter = taosHashIterate(pThrd->msgCount, NULL);
    while (pIter != NULL) {
      int*   count = pIter;
      size_t len = 0;
      char*  key = taosHashGetKey(pIter, &len);
      if (*count != 0) {
        tDebug("key: %s count: %d", key, *count);
      }
dengyihao's avatar
dengyihao 已提交
1812

dengyihao's avatar
dengyihao 已提交
1813 1814 1815
      pIter = taosHashIterate(pThrd->msgCount, pIter);
    }
    tDebug("all conn count: %d", pThrd->newConnCount);
dengyihao's avatar
dengyihao 已提交
1816
  }
dengyihao's avatar
dengyihao 已提交
1817

dengyihao's avatar
dengyihao 已提交
1818
  int8_t supportBatch = pTransInst->supportBatch;
dengyihao's avatar
dengyihao 已提交
1819
  if (supportBatch == 0) {
dengyihao's avatar
dengyihao 已提交
1820
    cliNoBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1821
  } else if (supportBatch == 1) {
dengyihao's avatar
dengyihao 已提交
1822
    cliBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1823
  }
dengyihao's avatar
dengyihao 已提交
1824

dengyihao's avatar
dengyihao 已提交
1825
  if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1826
}
dengyihao's avatar
dengyihao 已提交
1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852
static void cliPrepareCb(uv_prepare_t* handle) {
  SCliThrd* thrd = handle->data;
  tTrace("prepare work start");

  SAsyncPool* pool = thrd->asyncPool;
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;

    queue wq;
    taosThreadMutexLock(&item->mtx);
    QUEUE_MOVE(&item->qmsg, &wq);
    taosThreadMutexUnlock(&item->mtx);

    int count = 0;
    while (!QUEUE_IS_EMPTY(&wq)) {
      queue* h = QUEUE_HEAD(&wq);
      QUEUE_REMOVE(h);

      SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
      (*cliAsyncHandle[pMsg->type])(pMsg, thrd);
      count++;
    }
  }
  tTrace("prepare work end");
  if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
dengyihao's avatar
dengyihao 已提交
1853
}
dengyihao's avatar
dengyihao 已提交
1854

dengyihao's avatar
dengyihao 已提交
1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878
void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
  transCtxCleanup(&conn->ctx);
  cliReleaseUnfinishedMsg(conn);
  if (destroy == 1) {
    transQueueDestroy(&conn->cliMsgs);
  } else {
    transQueueClear(&conn->cliMsgs);
  }
}

void cliIteraConnMsgs(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* cmsg = transQueueGet(&conn->cliMsgs, i);
    if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) {
      continue;
    }

    STransMsg resp = {0};
    if (-1 == cliBuildExceptResp(cmsg, &resp)) {
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1879
    resp.info.cliVer = pTransInst->compatibilityVer;
dengyihao's avatar
dengyihao 已提交
1880 1881 1882 1883 1884
    pTransInst->cfp(pTransInst->parent, &resp, NULL);

    cmsg->ctx->ahandle = NULL;
  }
}
dengyihao's avatar
dengyihao 已提交
1885 1886 1887
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
  if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
    uint64_t ahandle = pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
1888
    tDebug("ahandle = %" PRIu64 "", ahandle);
dengyihao's avatar
dengyihao 已提交
1889 1890
    SCliMsg* pMsg = NULL;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
dengyihao's avatar
dengyihao 已提交
1891

dengyihao's avatar
dengyihao 已提交
1892 1893
    transClearBuffer(&conn->readBuf);
    transFreeMsg(transContFromHead((char*)pHead));
dengyihao's avatar
dengyihao 已提交
1894 1895 1896 1897

    for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->cliMsgs); i++) {
      SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i);
      if (cliMsg->type == Release) {
dengyihao's avatar
dengyihao 已提交
1898
        ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req");
dengyihao's avatar
dengyihao 已提交
1899 1900
        return true;
      }
dengyihao's avatar
dengyihao 已提交
1901
    }
dengyihao's avatar
dengyihao 已提交
1902 1903 1904

    cliIteraConnMsgs(conn);

dengyihao's avatar
dengyihao 已提交
1905 1906
    tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1907

dengyihao's avatar
dengyihao 已提交
1908 1909 1910 1911 1912 1913
    addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
    return true;
  }
  return false;
}

U
ubuntu 已提交
1914
static void* cliWorkThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
1915
  SCliThrd* pThrd = (SCliThrd*)arg;
dengyihao's avatar
dengyihao 已提交
1916
  pThrd->pid = taosGetSelfPthreadId();
G
gccgdb1234 已提交
1917
  setThreadName("trans-cli-work");
dengyihao's avatar
dengyihao 已提交
1918
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
dengyihao's avatar
dengyihao 已提交
1919 1920

  tDebug("thread quit-thread:%08" PRId64, pThrd->pid);
1921
  return NULL;
dengyihao's avatar
dengyihao 已提交
1922 1923
}

U
ubuntu 已提交
1924
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
1925
  SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj));
dengyihao's avatar
dengyihao 已提交
1926

dengyihao's avatar
dengyihao 已提交
1927
  STrans* pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
1928
  memcpy(cli->label, label, TSDB_LABEL_LEN);
dengyihao's avatar
dengyihao 已提交
1929
  cli->numOfThreads = numOfThreads;
dengyihao's avatar
dengyihao 已提交
1930
  cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*));
dengyihao's avatar
dengyihao 已提交
1931 1932

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
1933
    SCliThrd* pThrd = createThrdObj(shandle);
dengyihao's avatar
dengyihao 已提交
1934
    if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
1935
      goto _err;
dengyihao's avatar
dengyihao 已提交
1936 1937 1938
    }

    int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
1939 1940 1941
    if (err != 0) {
      goto _err;
    } else {
S
Shengliang Guan 已提交
1942
      tDebug("success to create tranport-cli thread:%d", i);
dengyihao's avatar
dengyihao 已提交
1943
    }
dengyihao's avatar
dengyihao 已提交
1944
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
1945 1946
  }
  return cli;
dengyihao's avatar
dengyihao 已提交
1947 1948 1949 1950 1951

_err:
  taosMemoryFree(cli->pThreadObj);
  taosMemoryFree(cli);
  return NULL;
dengyihao's avatar
dengyihao 已提交
1952
}
dengyihao's avatar
dengyihao 已提交
1953

dengyihao's avatar
dengyihao 已提交
1954
static FORCE_INLINE void destroyUserdata(STransMsg* userdata) {
dengyihao's avatar
dengyihao 已提交
1955 1956 1957 1958 1959 1960
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
dengyihao's avatar
dengyihao 已提交
1961

dengyihao's avatar
dengyihao 已提交
1962
static FORCE_INLINE void destroyCmsg(void* arg) {
dengyihao's avatar
dengyihao 已提交
1963
  SCliMsg* pMsg = arg;
dengyihao's avatar
dengyihao 已提交
1964 1965 1966
  if (pMsg == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1967

dengyihao's avatar
dengyihao 已提交
1968 1969
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
wafwerar's avatar
wafwerar 已提交
1970
  taosMemoryFree(pMsg);
dengyihao's avatar
dengyihao 已提交
1971
}
dengyihao's avatar
dengyihao 已提交
1972

dengyihao's avatar
dengyihao 已提交
1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
  if (param == NULL) return;

  STaskArg* arg = param;
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;

  tDebug("destroy Ahandle A");
  if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
    tDebug("destroy Ahandle B");
    pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
  }
  tDebug("destroy Ahandle C");

  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
  taosMemoryFree(pMsg);
}

dengyihao's avatar
dengyihao 已提交
1992 1993 1994
static SCliThrd* createThrdObj(void* trans) {
  STrans* pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
1995
  SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd));
U
ubuntu 已提交
1996

dengyihao's avatar
dengyihao 已提交
1997
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
1998
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
1999

wafwerar's avatar
wafwerar 已提交
2000
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
2001 2002 2003 2004 2005 2006 2007 2008
  int err = uv_loop_init(pThrd->loop);
  if (err != 0) {
    tError("failed to init uv_loop, reason:%s", uv_err_name(err));
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
2009 2010 2011 2012 2013
  if (pTransInst->supportBatch) {
    pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 4, pThrd, cliAsyncCb);
  } else {
    pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb);
  }
dengyihao's avatar
dengyihao 已提交
2014
  if (pThrd->asyncPool == NULL) {
dengyihao's avatar
ref log  
dengyihao 已提交
2015
    tError("failed to init async pool");
dengyihao's avatar
dengyihao 已提交
2016 2017 2018 2019 2020 2021
    uv_loop_close(pThrd->loop);
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
2022 2023 2024 2025

  pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
  uv_prepare_init(pThrd->loop, pThrd->prepare);
  pThrd->prepare->data = pThrd;
dengyihao's avatar
dengyihao 已提交
2026
  // uv_prepare_start(pThrd->prepare, cliPrepareCb);
dengyihao's avatar
dengyihao 已提交
2027

dengyihao's avatar
dengyihao 已提交
2028
  int32_t timerSize = 64;
dengyihao's avatar
dengyihao 已提交
2029 2030 2031 2032 2033 2034 2035
  pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
  for (int i = 0; i < timerSize; i++) {
    uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    uv_timer_init(pThrd->loop, timer);
    taosArrayPush(pThrd->timerList, &timer);
  }

dengyihao's avatar
dengyihao 已提交
2036
  pThrd->pool = createConnPool(4);
dengyihao's avatar
dengyihao 已提交
2037
  transDQCreate(pThrd->loop, &pThrd->delayQueue);
dengyihao's avatar
dengyihao 已提交
2038

dengyihao's avatar
dengyihao 已提交
2039 2040
  transDQCreate(pThrd->loop, &pThrd->timeoutQueue);

dengyihao's avatar
dengyihao 已提交
2041 2042
  transDQCreate(pThrd->loop, &pThrd->waitConnQueue);

dengyihao's avatar
dengyihao 已提交
2043 2044 2045
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
  pThrd->pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
2046
  pThrd->destroyAhandleFp = pTransInst->destroyFp;
dengyihao's avatar
dengyihao 已提交
2047
  pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
2048 2049
  pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);

dengyihao's avatar
dengyihao 已提交
2050
  pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
2051

dengyihao's avatar
dengyihao 已提交
2052
  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
2053 2054

  pThrd->newConnCount = 0;
dengyihao's avatar
dengyihao 已提交
2055
  pThrd->msgCount = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
2056 2057
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
2058
static void destroyThrdObj(SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
2059 2060 2061
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
2062

wafwerar's avatar
wafwerar 已提交
2063
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
2064
  CLI_RELEASE_UV(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
2065
  taosThreadMutexDestroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
2066
  TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
dengyihao's avatar
dengyihao 已提交
2067
  transAsyncPoolDestroy(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
2068

dengyihao's avatar
dengyihao 已提交
2069
  transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
dengyihao's avatar
dengyihao 已提交
2070
  transDQDestroy(pThrd->timeoutQueue, NULL);
dengyihao's avatar
dengyihao 已提交
2071
  transDQDestroy(pThrd->waitConnQueue, NULL);
dengyihao's avatar
dengyihao 已提交
2072

dengyihao's avatar
dengyihao 已提交
2073
  tDebug("thread destroy %" PRId64, pThrd->pid);
dengyihao's avatar
dengyihao 已提交
2074 2075 2076 2077 2078
  for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
    uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
    taosMemoryFree(timer);
  }
  taosArrayDestroy(pThrd->timerList);
dengyihao's avatar
dengyihao 已提交
2079
  taosMemoryFree(pThrd->prepare);
wafwerar's avatar
wafwerar 已提交
2080
  taosMemoryFree(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
2081
  taosHashCleanup(pThrd->fqdn2ipCache);
dengyihao's avatar
dengyihao 已提交
2082
  taosHashCleanup(pThrd->failFastCache);
dengyihao's avatar
dengyihao 已提交
2083 2084 2085

  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
  while (pIter != NULL) {
dengyihao's avatar
dengyihao 已提交
2086 2087 2088 2089 2090 2091 2092 2093
    SCliBatchList* pBatchList = (SCliBatchList*)(*pIter);
    while (!QUEUE_IS_EMPTY(&pBatchList->wq)) {
      queue* h = QUEUE_HEAD(&pBatchList->wq);
      QUEUE_REMOVE(h);

      SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
      cliDestroyBatch(pBatch);
    }
dengyihao's avatar
dengyihao 已提交
2094 2095
    taosMemoryFree(pBatchList->ip);
    taosMemoryFree(pBatchList->dst);
dengyihao's avatar
dengyihao 已提交
2096
    taosMemoryFree(pBatchList);
dengyihao's avatar
dengyihao 已提交
2097

dengyihao's avatar
dengyihao 已提交
2098 2099
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
  }
dengyihao's avatar
dengyihao 已提交
2100
  taosHashCleanup(pThrd->batchCache);
dengyihao's avatar
dengyihao 已提交
2101
  taosHashCleanup(pThrd->msgCount);
wafwerar's avatar
wafwerar 已提交
2102
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
2103
}
dengyihao's avatar
dengyihao 已提交
2104

dengyihao's avatar
dengyihao 已提交
2105
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
2106
  //
wafwerar's avatar
wafwerar 已提交
2107
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
2108
}
dengyihao's avatar
dengyihao 已提交
2109

dengyihao's avatar
dengyihao 已提交
2110
void cliSendQuit(SCliThrd* thrd) {
dengyihao's avatar
dengyihao 已提交
2111
  // cli can stop gracefully
wafwerar's avatar
wafwerar 已提交
2112
  SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2113
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
2114
  transAsyncSend(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
2115
  atomic_store_8(&thrd->asyncPool->stop, 1);
dengyihao's avatar
dengyihao 已提交
2116
}
dengyihao's avatar
dengyihao 已提交
2117 2118
void cliWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
dengyihao's avatar
dengyihao 已提交
2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130
    if (uv_handle_get_type(handle) == UV_TIMER) {
      // SCliConn* pConn = handle->data;
      //  if (pConn != NULL && pConn->timer != NULL) {
      //    SCliThrd* pThrd = pConn->hostThrd;
      //    uv_timer_stop((uv_timer_t*)handle);
      //    handle->data = NULL;
      //    taosArrayPush(pThrd->timerList, &pConn->timer);
      //    pConn->timer = NULL;
      //  }
    } else {
      uv_read_stop((uv_stream_t*)handle);
    }
dengyihao's avatar
dengyihao 已提交
2131
    uv_close(handle, cliDestroy);
dengyihao's avatar
dengyihao 已提交
2132 2133
  }
}
dengyihao's avatar
dengyihao 已提交
2134

dengyihao's avatar
dengyihao 已提交
2135
FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
dengyihao's avatar
dengyihao 已提交
2136
  int32_t index = pTransInst->index;
dengyihao's avatar
dengyihao 已提交
2137 2138 2139
  if (pTransInst->numOfThreads == 0) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2140 2141 2142 2143
  /*
   * no lock, and to avoid CPU load imbalance, set limit pTransInst->numOfThreads * 2000;
   */
  if (pTransInst->index++ >= pTransInst->numOfThreads * 2000) {
U
ubuntu 已提交
2144 2145 2146 2147
    pTransInst->index = 0;
  }
  return index % pTransInst->numOfThreads;
}
dengyihao's avatar
dengyihao 已提交
2148
static FORCE_INLINE void doDelayTask(void* param) {
dengyihao's avatar
dengyihao 已提交
2149
  STaskArg* arg = param;
dengyihao's avatar
dengyihao 已提交
2150
  cliHandleReq((SCliMsg*)arg->param1, (SCliThrd*)arg->param2);
dengyihao's avatar
dengyihao 已提交
2151 2152
  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
2153

dengyihao's avatar
dengyihao 已提交
2154 2155 2156
static void doCloseIdleConn(void* param) {
  STaskArg* arg = param;
  SCliConn* conn = arg->param1;
dengyihao's avatar
dengyihao 已提交
2157
  tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
2158
  conn->task = NULL;
dengyihao's avatar
dengyihao 已提交
2159 2160
  cliDestroyConn(conn, true);
  taosMemoryFree(arg);
dengyihao's avatar
dengyihao 已提交
2161 2162
}

dengyihao's avatar
dengyihao 已提交
2163
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
2164
  STrans*        pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2165 2166
  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
2167 2168 2169 2170 2171 2172 2173
  if (rpcDebugFlag & DEBUG_DEBUG) {
    STraceId* trace = &pMsg->msg.info.traceId;
    char      tbuf[256] = {0};
    EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
    tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
            pCtx->retryStep, pCtx->retryNextInterval);
  }
dengyihao's avatar
dengyihao 已提交
2174 2175 2176 2177

  STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
  arg->param1 = pMsg;
  arg->param2 = pThrd;
dengyihao's avatar
dengyihao 已提交
2178 2179

  transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval);
dengyihao's avatar
dengyihao 已提交
2180
}
dengyihao's avatar
dengyihao 已提交
2181

dengyihao's avatar
dengyihao 已提交
2182
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
dengyihao's avatar
dengyihao 已提交
2183 2184 2185 2186
  if (*val != exp) {
    *val = newVal;
  }
}
dengyihao's avatar
dengyihao 已提交
2187

dengyihao's avatar
dengyihao 已提交
2188
FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
dengyihao's avatar
dengyihao 已提交
2189 2190 2191 2192 2193 2194
  if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
    return false;
  }
  // rebuild resp msg
  SEpSet epset;
  if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) {
dengyihao's avatar
dengyihao 已提交
2195 2196 2197 2198
    return false;
  }
  int32_t tlen = tSerializeSEpSet(NULL, 0, dst);

dengyihao's avatar
dengyihao 已提交
2199 2200 2201 2202 2203 2204 2205
  char*   buf = NULL;
  int32_t len = pResp->contLen - tlen;
  if (len != 0) {
    buf = rpcMallocCont(len);
    memcpy(buf, (char*)pResp->pCont + tlen, len);
  }
  rpcFreeCont(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2206 2207

  pResp->pCont = buf;
dengyihao's avatar
dengyihao 已提交
2208 2209
  pResp->contLen = len;

dengyihao's avatar
dengyihao 已提交
2210
  epsetAssign(dst, &epset);
dengyihao's avatar
dengyihao 已提交
2211 2212
  return true;
}
dengyihao's avatar
dengyihao 已提交
2213
bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
dengyihao's avatar
dengyihao 已提交
2214
  bool noDelay = true;
dengyihao's avatar
dengyihao 已提交
2215 2216 2217 2218 2219 2220 2221
  if (hasEpSet == false) {
    if (pResp->contLen == 0) {
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2222 2223 2224 2225 2226 2227 2228 2229 2230 2231
    } else if (pResp->contLen != 0) {
      SEpSet  epSet;
      int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
      if (valid < 0) {
        tDebug("get invalid epset, epset equal, continue");
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
2232
      } else {
dengyihao's avatar
dengyihao 已提交
2233 2234
        if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
          tDebug("epset not equal, retry new epset");
dengyihao's avatar
dengyihao 已提交
2235
          epsetAssign(&pCtx->epSet, &epSet);
dengyihao's avatar
dengyihao 已提交
2236 2237 2238 2239 2240 2241 2242 2243 2244
          noDelay = false;
        } else {
          if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
            noDelay = false;
          } else {
            tDebug("epset equal, continue");
            EPSET_FORWARD_INUSE(&pCtx->epSet);
          }
        }
dengyihao's avatar
dengyihao 已提交
2245
      }
dengyihao's avatar
dengyihao 已提交
2246 2247
    }
  } else {
dengyihao's avatar
dengyihao 已提交
2248
    SEpSet  epSet;
dengyihao's avatar
dengyihao 已提交
2249 2250
    int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
    if (valid < 0) {
dengyihao's avatar
dengyihao 已提交
2251 2252 2253 2254 2255 2256
      tDebug("get invalid epset, epset equal, continue");
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2257
    } else {
dengyihao's avatar
dengyihao 已提交
2258 2259
      if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
        tDebug("epset not equal, retry new epset");
dengyihao's avatar
dengyihao 已提交
2260
        epsetAssign(&pCtx->epSet, &epSet);
dengyihao's avatar
dengyihao 已提交
2261
        noDelay = false;
dengyihao's avatar
dengyihao 已提交
2262
      } else {
dengyihao's avatar
dengyihao 已提交
2263 2264 2265 2266 2267 2268
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          tDebug("epset equal, continue");
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
2269
      }
dengyihao's avatar
dengyihao 已提交
2270 2271 2272 2273 2274
    }
  }
  return noDelay;
}
bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
2275 2276
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2277

dengyihao's avatar
dengyihao 已提交
2278 2279
  STransConnCtx* pCtx = pMsg->ctx;
  int32_t        code = pResp->code;
dengyihao's avatar
dengyihao 已提交
2280

dengyihao's avatar
dengyihao 已提交
2281
  bool retry = pTransInst->retry != NULL ? pTransInst->retry(code, pResp->msgType - 1) : false;
dengyihao's avatar
dengyihao 已提交
2282 2283 2284
  if (retry == false) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
2285 2286 2287 2288 2289 2290 2291 2292

  if (!pCtx->retryInit) {
    pCtx->retryMinInterval = pTransInst->retryMinInterval;
    pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
    pCtx->retryStepFactor = pTransInst->retryStepFactor;
    pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet;
    pCtx->retryInitTimestamp = taosGetTimestampMs();
    pCtx->retryNextInterval = pCtx->retryMinInterval;
dengyihao's avatar
dengyihao 已提交
2293
    pCtx->retryStep = 0;
dengyihao's avatar
dengyihao 已提交
2294
    pCtx->retryInit = true;
dengyihao's avatar
dengyihao 已提交
2295
    pCtx->retryCode = TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
2296 2297 2298

    // already retry, not use handle specified by app;
    pMsg->msg.info.handle = 0;
dengyihao's avatar
dengyihao 已提交
2299
  }
dengyihao's avatar
dengyihao 已提交
2300

dengyihao's avatar
dengyihao 已提交
2301 2302
  if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
    return false;
dengyihao's avatar
dengyihao 已提交
2303
  }
dengyihao's avatar
dengyihao 已提交
2304

dengyihao's avatar
dengyihao 已提交
2305 2306 2307 2308 2309 2310
  // code, msgType

  // A:  epset,   leader, not self
  // B:  epset,   not know leader
  // C:  no epset, leader but not serivce

dengyihao's avatar
dengyihao 已提交
2311 2312
  bool noDelay = false;
  if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
dengyihao's avatar
dengyihao 已提交
2313
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2314
    noDelay = cliResetEpset(pCtx, pResp, false);
dengyihao's avatar
dengyihao 已提交
2315 2316
    transFreeMsg(pResp->pCont);
    transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
2317
  } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
2318 2319
             code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED ||
             code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
dengyihao's avatar
dengyihao 已提交
2320
             code == TSDB_CODE_APP_IS_STOPPING || code == TSDB_CODE_VND_STOPPED) {
dengyihao's avatar
dengyihao 已提交
2321
    tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2322
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
2323 2324
    transFreeMsg(pResp->pCont);
    addConnToPool(pThrd->pool, pConn);
dengyihao's avatar
dengyihao 已提交
2325
  } else if (code == TSDB_CODE_SYN_RESTORING) {
dengyihao's avatar
dengyihao 已提交
2326
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2327
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
2328 2329
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2330
  } else {
dengyihao's avatar
dengyihao 已提交
2331
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2332 2333 2334
    noDelay = cliResetEpset(pCtx, pResp, false);
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2335
  }
dengyihao's avatar
dengyihao 已提交
2336 2337 2338 2339
  if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) {
    // save one internal code
    pCtx->retryCode = code;
  }
dengyihao's avatar
dengyihao 已提交
2340

dengyihao's avatar
dengyihao 已提交
2341 2342 2343
  if (noDelay == false) {
    pCtx->epsetRetryCnt = 1;
    pCtx->retryStep++;
dengyihao's avatar
dengyihao 已提交
2344

dengyihao's avatar
dengyihao 已提交
2345 2346 2347 2348 2349 2350 2351 2352
    int64_t factor = pow(pCtx->retryStepFactor, pCtx->retryStep - 1);
    pCtx->retryNextInterval = factor * pCtx->retryMinInterval;
    if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
      pCtx->retryNextInterval = pCtx->retryMaxInterval;
    }
  } else {
    pCtx->retryNextInterval = 0;
    pCtx->epsetRetryCnt++;
dengyihao's avatar
dengyihao 已提交
2353
  }
dengyihao's avatar
dengyihao 已提交
2354

dengyihao's avatar
dengyihao 已提交
2355
  pMsg->sent = 0;
dengyihao's avatar
dengyihao 已提交
2356
  cliSchedMsgToNextNode(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
2357
  return true;
dengyihao's avatar
dengyihao 已提交
2358
}
dengyihao's avatar
dengyihao 已提交
2359
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
2360 2361
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2362

dengyihao's avatar
dengyihao 已提交
2363
  if (pMsg == NULL || pMsg->ctx == NULL) {
dengyihao's avatar
dengyihao 已提交
2364
    tTrace("%s conn %p handle resp", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
2365 2366 2367
    pTransInst->cfp(pTransInst->parent, pResp, NULL);
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
2368

dengyihao's avatar
dengyihao 已提交
2369
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
2370

dengyihao's avatar
dengyihao 已提交
2371 2372 2373
  bool retry = cliGenRetryRule(pConn, pResp, pMsg);
  if (retry == true) {
    return -1;
dengyihao's avatar
dengyihao 已提交
2374
  }
dengyihao's avatar
dengyihao 已提交
2375

dengyihao's avatar
dengyihao 已提交
2376 2377 2378
  if (pCtx->retryCode != TSDB_CODE_SUCCESS) {
    int32_t code = pResp->code;
    // return internal code app
dengyihao's avatar
dengyihao 已提交
2379 2380
    if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK ||
        code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
dengyihao's avatar
dengyihao 已提交
2381 2382 2383 2384
      pResp->code = pCtx->retryCode;
    }
  }

2385
  // check whole vnodes is offline on this vgroup
2386 2387
  if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps || pCtx->retryStep > 0) {
    if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
A
Alex Duan 已提交
2388
      pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
2389
    } else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) {
A
Alex Duan 已提交
2390
      pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK;
2391 2392 2393
    }
  }

dengyihao's avatar
dengyihao 已提交
2394 2395
  STraceId* trace = &pResp->info.traceId;
  bool      hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2396
  if (hasEpSet) {
dengyihao's avatar
dengyihao 已提交
2397 2398 2399 2400 2401
    if (rpcDebugFlag & DEBUG_TRACE) {
      char tbuf[256] = {0};
      EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
      tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
    }
dengyihao's avatar
dengyihao 已提交
2402
  }
dengyihao's avatar
dengyihao 已提交
2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414
  if (rpcDebugFlag & DEBUG_TRACE) {
    if (tmsgIsValid(pResp->msgType - 1)) {
      char buf[128] = {0};
      sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1));
      int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
      if (NULL == 0) {
        int localCount = 0;
        taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
      } else {
        int localCount = *count - 1;
        taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
      }
dengyihao's avatar
dengyihao 已提交
2415 2416
    }
  }
dengyihao's avatar
dengyihao 已提交
2417
  if (pCtx->pSem != NULL) {
dengyihao's avatar
dengyihao 已提交
2418
    tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
2419
    if (pCtx->pRsp == NULL) {
dengyihao's avatar
dengyihao 已提交
2420
      tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
2421 2422 2423
    } else {
      memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
    }
dengyihao's avatar
dengyihao 已提交
2424
    tsem_post(pCtx->pSem);
2425
    pCtx->pRsp = NULL;
dengyihao's avatar
dengyihao 已提交
2426
  } else {
dengyihao's avatar
dengyihao 已提交
2427
    tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
2428
    if (retry == false && hasEpSet == true) {
dengyihao's avatar
dengyihao 已提交
2429
      pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2430
    } else {
dengyihao's avatar
dengyihao 已提交
2431
      if (!cliIsEpsetUpdated(pResp->code, pCtx)) {
dengyihao's avatar
dengyihao 已提交
2432 2433 2434 2435
        pTransInst->cfp(pTransInst->parent, pResp, NULL);
      } else {
        pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2436
    }
dengyihao's avatar
dengyihao 已提交
2437
  }
dengyihao's avatar
dengyihao 已提交
2438
  return 0;
dengyihao's avatar
dengyihao 已提交
2439
}
U
ubuntu 已提交
2440 2441

void transCloseClient(void* arg) {
U
ubuntu 已提交
2442
  SCliObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
2443
  for (int i = 0; i < cli->numOfThreads; i++) {
U
ubuntu 已提交
2444
    cliSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2445
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2446
  }
wafwerar's avatar
wafwerar 已提交
2447 2448
  taosMemoryFree(cli->pThreadObj);
  taosMemoryFree(cli);
dengyihao's avatar
dengyihao 已提交
2449
}
dengyihao's avatar
dengyihao 已提交
2450 2451 2452 2453 2454
void transRefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2455
  tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2456 2457 2458 2459 2460 2461 2462
  UNUSED(ref);
}
void transUnrefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2463
  tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2464
  if (ref == 0) {
U
ubuntu 已提交
2465
    cliDestroyConn((SCliConn*)handle, true);
dengyihao's avatar
dengyihao 已提交
2466 2467
  }
}
dengyihao's avatar
dengyihao 已提交
2468
static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2469
  SCliThrd*  pThrd = NULL;
dengyihao's avatar
dengyihao 已提交
2470
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2471 2472 2473
  if (exh == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
2474

dengyihao's avatar
dengyihao 已提交
2475 2476 2477 2478 2479 2480
  if (exh->pThrd == NULL && trans != NULL) {
    int idx = cliRBChoseIdx(trans);
    if (idx < 0) return NULL;
    exh->pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }

dengyihao's avatar
dengyihao 已提交
2481
  pThrd = exh->pThrd;
dengyihao's avatar
dengyihao 已提交
2482
  transReleaseExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2483 2484
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
2485
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2486
  if (handle == 0) {
dengyihao's avatar
dengyihao 已提交
2487
    int idx = cliRBChoseIdx(trans);
dengyihao's avatar
dengyihao 已提交
2488
    if (idx < 0) return NULL;
dengyihao's avatar
dengyihao 已提交
2489 2490
    return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }
dengyihao's avatar
dengyihao 已提交
2491
  SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle);
D
dapan1121 已提交
2492
  return pThrd;
dengyihao's avatar
dengyihao 已提交
2493
}
dengyihao's avatar
dengyihao 已提交
2494
int transReleaseCliHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
2495 2496 2497
  int  idx = -1;
  bool valid = false;

dengyihao's avatar
dengyihao 已提交
2498
  SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
dengyihao's avatar
dengyihao 已提交
2499
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2500
    return -1;
dengyihao's avatar
dengyihao 已提交
2501
  }
dengyihao's avatar
dengyihao 已提交
2502

dengyihao's avatar
dengyihao 已提交
2503
  STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527};
dengyihao's avatar
rm code  
dengyihao 已提交
2504
  TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
dengyihao's avatar
dengyihao 已提交
2505

dengyihao's avatar
dengyihao 已提交
2506 2507 2508
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
  pCtx->ahandle = tmsg.info.ahandle;

dengyihao's avatar
dengyihao 已提交
2509
  SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2510
  cmsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
2511
  cmsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2512
  cmsg->type = Release;
dengyihao's avatar
dengyihao 已提交
2513
  cmsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2514

dengyihao's avatar
dengyihao 已提交
2515 2516 2517
  STraceId* trace = &tmsg.info.traceId;
  tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);

dengyihao's avatar
dengyihao 已提交
2518
  if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
dengyihao's avatar
dengyihao 已提交
2519
    destroyCmsg(cmsg);
dengyihao's avatar
dengyihao 已提交
2520 2521
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2522
  return 0;
dengyihao's avatar
dengyihao 已提交
2523
}
dengyihao's avatar
dengyihao 已提交
2524

dengyihao's avatar
dengyihao 已提交
2525
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
2526 2527 2528 2529 2530
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2531

dengyihao's avatar
dengyihao 已提交
2532
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
dengyihao's avatar
dengyihao 已提交
2533
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2534
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2535
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2536
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2537 2538
  }

dengyihao's avatar
dengyihao 已提交
2539
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
wafwerar's avatar
wafwerar 已提交
2540
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2541 2542 2543
  epsetAssign(&pCtx->epSet, pEpSet);
  epsetAssign(&pCtx->origEpSet, pEpSet);

S
Shengliang Guan 已提交
2544
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2545
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2546

H
Haojun Liao 已提交
2547
  if (ctx != NULL) pCtx->appCtx = *ctx;
dengyihao's avatar
dengyihao 已提交
2548

wafwerar's avatar
wafwerar 已提交
2549
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2550
  cliMsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2551
  cliMsg->msg = *pReq;
dengyihao's avatar
dengyihao 已提交
2552
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2553
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2554
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2555

dengyihao's avatar
dengyihao 已提交
2556
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2557 2558
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2559 2560
  if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2561
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2562 2563
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2564
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2565
  return 0;
dengyihao's avatar
dengyihao 已提交
2566
}
dengyihao's avatar
dengyihao 已提交
2567

dengyihao's avatar
dengyihao 已提交
2568
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
dengyihao's avatar
dengyihao 已提交
2569 2570 2571 2572 2573
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2574

dengyihao's avatar
dengyihao 已提交
2575 2576
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2577
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2578
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2579
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2580
  }
dengyihao's avatar
dengyihao 已提交
2581

dengyihao's avatar
dengyihao 已提交
2582 2583
  tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
  tsem_init(sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
2584

dengyihao's avatar
dengyihao 已提交
2585 2586
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());

wafwerar's avatar
wafwerar 已提交
2587
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2588 2589
  epsetAssign(&pCtx->epSet, pEpSet);
  epsetAssign(&pCtx->origEpSet, pEpSet);
S
Shengliang Guan 已提交
2590
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2591
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2592
  pCtx->pSem = sem;
dengyihao's avatar
dengyihao 已提交
2593 2594
  pCtx->pRsp = pRsp;

wafwerar's avatar
wafwerar 已提交
2595
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2596 2597 2598
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2599
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2600
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2601

dengyihao's avatar
dengyihao 已提交
2602
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2603 2604
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2605

dengyihao's avatar
dengyihao 已提交
2606 2607
  int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
  if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
2608
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2609
    goto _RETURN;
dengyihao's avatar
dengyihao 已提交
2610
  }
dengyihao's avatar
dengyihao 已提交
2611
  tsem_wait(sem);
dengyihao's avatar
dengyihao 已提交
2612 2613

_RETURN:
dengyihao's avatar
dengyihao 已提交
2614 2615
  tsem_destroy(sem);
  taosMemoryFree(sem);
dengyihao's avatar
dengyihao 已提交
2616
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2617
  return ret;
dengyihao's avatar
dengyihao 已提交
2618
}
dengyihao's avatar
dengyihao 已提交
2619 2620 2621
/*
 *
 **/
dengyihao's avatar
dengyihao 已提交
2622
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
dengyihao's avatar
dengyihao 已提交
2623
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2624 2625 2626
  if (pTransInst == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2627 2628 2629

  SCvtAddr cvtAddr = {0};
  if (ip != NULL && fqdn != NULL) {
dengyihao's avatar
dengyihao 已提交
2630 2631
    tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
    tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn));
dengyihao's avatar
dengyihao 已提交
2632 2633
    cvtAddr.cvt = true;
  }
dengyihao's avatar
dengyihao 已提交
2634 2635
  for (int i = 0; i < pTransInst->numOfThreads; i++) {
    STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2636
    pCtx->cvtAddr = cvtAddr;
dengyihao's avatar
dengyihao 已提交
2637 2638 2639 2640

    SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
    cliMsg->ctx = pCtx;
    cliMsg->type = Update;
dengyihao's avatar
dengyihao 已提交
2641
    cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2642

dengyihao's avatar
dengyihao 已提交
2643
    SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
S
Shengliang Guan 已提交
2644
    tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid);
dengyihao's avatar
dengyihao 已提交
2645

dengyihao's avatar
dengyihao 已提交
2646 2647
    if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) {
      destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2648
      transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2649 2650
      return -1;
    }
dengyihao's avatar
dengyihao 已提交
2651
  }
dengyihao's avatar
dengyihao 已提交
2652
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2653
  return 0;
dengyihao's avatar
dengyihao 已提交
2654
}
dengyihao's avatar
dengyihao 已提交
2655 2656 2657 2658 2659

int64_t transAllocHandle() {
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
  tDebug("pre alloc refId %" PRId64 "", exh->refId);
dengyihao's avatar
dengyihao 已提交
2660

dengyihao's avatar
dengyihao 已提交
2661 2662
  return exh->refId;
}