transCli.c 83.4 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
dengyihao's avatar
dengyihao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "transComm.h"

dengyihao's avatar
dengyihao 已提交
17 18 19 20 21
typedef struct {
  int32_t numOfConn;
  queue   msgQ;
} SMsgList;

dengyihao's avatar
dengyihao 已提交
22
typedef struct SConnList {
dengyihao's avatar
dengyihao 已提交
23 24 25
  queue     conns;
  int32_t   size;
  SMsgList* list;
dengyihao's avatar
dengyihao 已提交
26 27
} SConnList;

dengyihao's avatar
dengyihao 已提交
28
typedef struct {
dengyihao's avatar
dengyihao 已提交
29 30 31 32 33 34
  queue   wq;
  int32_t len;

  int connMax;
  int connCnt;
  int batchLenLimit;
dengyihao's avatar
dengyihao 已提交
35
  int sending;
dengyihao's avatar
dengyihao 已提交
36

dengyihao's avatar
dengyihao 已提交
37 38 39
  char*    dst;
  char*    ip;
  uint16_t port;
dengyihao's avatar
dengyihao 已提交
40 41 42 43 44 45 46 47 48 49

} SCliBatchList;

typedef struct {
  queue          wq;
  queue          listq;
  int32_t        wLen;
  int32_t        batchSize;  //
  int32_t        batch;
  SCliBatchList* pList;
dengyihao's avatar
dengyihao 已提交
50
} SCliBatch;
dengyihao's avatar
dengyihao 已提交
51

dengyihao's avatar
dengyihao 已提交
52
typedef struct SCliConn {
dengyihao's avatar
dengyihao 已提交
53
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
54 55
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
56
  queue        wreqQueue;
dengyihao's avatar
dengyihao 已提交
57 58

  uv_timer_t* timer;  // read timer, forbidden
dengyihao's avatar
dengyihao 已提交
59

dengyihao's avatar
dengyihao 已提交
60 61 62 63
  void* hostThrd;

  SConnBuffer readBuf;
  STransQueue cliMsgs;
dengyihao's avatar
dengyihao 已提交
64 65 66

  queue      q;
  SConnList* list;
dengyihao's avatar
dengyihao 已提交
67 68

  STransCtx  ctx;
dengyihao's avatar
dengyihao 已提交
69 70
  bool       broken;  // link broken or not
  ConnStatus status;  //
dengyihao's avatar
dengyihao 已提交
71

dengyihao's avatar
dengyihao 已提交
72 73
  SCliBatch* pBatch;

dengyihao's avatar
dengyihao 已提交
74
  SDelayTask* task;
dengyihao's avatar
dengyihao 已提交
75

dengyihao's avatar
dengyihao 已提交
76 77 78
  char* ip;
  char  src[32];
  char  dst[32];
dengyihao's avatar
dengyihao 已提交
79

dengyihao's avatar
dengyihao 已提交
80
  int64_t refId;
dengyihao's avatar
dengyihao 已提交
81
} SCliConn;
dengyihao's avatar
dengyihao 已提交
82

dengyihao's avatar
dengyihao 已提交
83
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
84
  STransConnCtx* ctx;
dengyihao's avatar
formate  
dengyihao 已提交
85
  STransMsg      msg;
dengyihao's avatar
dengyihao 已提交
86
  queue          q;
dengyihao's avatar
dengyihao 已提交
87
  STransMsgType  type;
dengyihao's avatar
dengyihao 已提交
88

dengyihao's avatar
dengyihao 已提交
89
  int64_t  refId;
dengyihao's avatar
dengyihao 已提交
90 91
  uint64_t st;
  int      sent;  //(0: no send, 1: alread sent)
dengyihao's avatar
dengyihao 已提交
92 93
} SCliMsg;

dengyihao's avatar
dengyihao 已提交
94
typedef struct SCliThrd {
dengyihao's avatar
dengyihao 已提交
95 96 97 98 99 100
  TdThread      thread;  // tid
  int64_t       pid;     // pid
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  uv_prepare_t* prepare;
  void*         pool;  // conn pool
dengyihao's avatar
dengyihao 已提交
101
  // timer handles
dengyihao's avatar
dengyihao 已提交
102
  SArray* timerList;
dengyihao's avatar
dengyihao 已提交
103
  // msg queue
dengyihao's avatar
dengyihao 已提交
104
  queue         msg;
wafwerar's avatar
wafwerar 已提交
105
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
106
  SDelayQueue*  delayQueue;
dengyihao's avatar
dengyihao 已提交
107
  SDelayQueue*  timeoutQueue;
dengyihao's avatar
dengyihao 已提交
108
  SDelayQueue*  waitConnQueue;
dengyihao's avatar
dengyihao 已提交
109 110
  uint64_t      nextTimeout;  // next timeout
  void*         pTransInst;   //
dengyihao's avatar
dengyihao 已提交
111

dengyihao's avatar
dengyihao 已提交
112
  int connCount;
dengyihao's avatar
dengyihao 已提交
113
  void (*destroyAhandleFp)(void* ahandle);
dengyihao's avatar
dengyihao 已提交
114 115
  SHashObj* fqdn2ipCache;
  SCvtAddr  cvtAddr;
dengyihao's avatar
dengyihao 已提交
116

dengyihao's avatar
dengyihao 已提交
117
  SHashObj* failFastCache;
dengyihao's avatar
dengyihao 已提交
118
  SHashObj* batchCache;
dengyihao's avatar
dengyihao 已提交
119

dengyihao's avatar
dengyihao 已提交
120 121
  SCliMsg* stopMsg;

dengyihao's avatar
dengyihao 已提交
122
  bool quit;
dengyihao's avatar
dengyihao 已提交
123 124 125

  int       newConnCount;
  SHashObj* msgCount;
dengyihao's avatar
dengyihao 已提交
126
} SCliThrd;
dengyihao's avatar
dengyihao 已提交
127

U
ubuntu 已提交
128
typedef struct SCliObj {
dengyihao's avatar
dengyihao 已提交
129 130 131 132
  char       label[TSDB_LABEL_LEN];
  int32_t    index;
  int        numOfThreads;
  SCliThrd** pThreadObj;
U
ubuntu 已提交
133
} SCliObj;
dengyihao's avatar
dengyihao 已提交
134

dengyihao's avatar
dengyihao 已提交
135 136 137 138 139 140 141
typedef struct {
  int32_t reinit;
  int64_t timestamp;
  int32_t count;
  int32_t threshold;
  int64_t interval;
} SFailFastItem;
dengyihao's avatar
dengyihao 已提交
142

dengyihao's avatar
dengyihao 已提交
143
// conn pool
dengyihao's avatar
dengyihao 已提交
144
// add expire timeout and capacity limit
dengyihao's avatar
dengyihao 已提交
145
static void*     createConnPool(int size);
dengyihao's avatar
dengyihao 已提交
146
static void*     destroyConnPool(SCliThrd* thread);
dengyihao's avatar
dengyihao 已提交
147
static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed);
dengyihao's avatar
dengyihao 已提交
148
static void      addConnToPool(void* pool, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
149
static void      doCloseIdleConn(void* param);
dengyihao's avatar
dengyihao 已提交
150

dengyihao's avatar
dengyihao 已提交
151 152
// register conn timer
static void cliConnTimeout(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
153 154
// register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
155
// register timer in each thread to clear expire conn
dengyihao's avatar
dengyihao 已提交
156
// static void cliTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
157
// alloc buffer for recv
dengyihao's avatar
dengyihao 已提交
158
static FORCE_INLINE void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
159
// callback after recv nbytes from socket
U
ubuntu 已提交
160
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
161
// callback after send data to socket
U
ubuntu 已提交
162
static void cliSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
163
// callback after conn to server
U
ubuntu 已提交
164 165
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
166
static void cliIdleCb(uv_idle_t* handle);
dengyihao's avatar
dengyihao 已提交
167
static void cliPrepareCb(uv_prepare_t* handle);
dengyihao's avatar
dengyihao 已提交
168

dengyihao's avatar
dengyihao 已提交
169
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
170
static void cliSendBatchCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
171 172

SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
dengyihao's avatar
dengyihao 已提交
173

dengyihao's avatar
dengyihao 已提交
174 175
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);

dengyihao's avatar
dengyihao 已提交
176
static int32_t allocConnRef(SCliConn* conn, bool update);
dengyihao's avatar
dengyihao 已提交
177

dengyihao's avatar
dengyihao 已提交
178
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
dengyihao's avatar
dengyihao 已提交
179

dengyihao's avatar
dengyihao 已提交
180
static SCliConn* cliCreateConn(SCliThrd* thrd);
U
ubuntu 已提交
181 182
static void      cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void      cliDestroy(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
183
static void      cliSend(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
184
static void      cliSendBatch(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
185
static void      cliDestroyConnMsgs(SCliConn* conn, bool destroy);
dengyihao's avatar
dengyihao 已提交
186

dengyihao's avatar
dengyihao 已提交
187 188
static void    doFreeTimeoutMsg(void* param);
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg);
dengyihao's avatar
dengyihao 已提交
189

dengyihao's avatar
dengyihao 已提交
190
// cli util func
dengyihao's avatar
dengyihao 已提交
191 192
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
dengyihao's avatar
dengyihao 已提交
193

dengyihao's avatar
dengyihao 已提交
194
static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
dengyihao's avatar
dengyihao 已提交
195

dengyihao's avatar
dengyihao 已提交
196 197 198
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn);
static FORCE_INLINE void     cliUpdateFqdnCache(SHashObj* cache, char* fqdn);

dengyihao's avatar
dengyihao 已提交
199
// process data read from server, add decompress etc later
U
ubuntu 已提交
200
static void cliHandleResp(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
201
// handle except about conn
U
ubuntu 已提交
202
static void cliHandleExcept(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
203
static void cliReleaseUnfinishedMsg(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
204
static void cliHandleFastFail(SCliConn* pConn, int status);
dengyihao's avatar
dengyihao 已提交
205

dengyihao's avatar
dengyihao 已提交
206
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd);
207
// handle req from app
dengyihao's avatar
dengyihao 已提交
208 209 210 211 212 213
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
                                                                   cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
214 215
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
/// NULL,cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
216

dengyihao's avatar
dengyihao 已提交
217 218
static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
static FORCE_INLINE void destroyCmsg(void* cmsg);
dengyihao's avatar
dengyihao 已提交
219
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
dengyihao's avatar
dengyihao 已提交
220
static FORCE_INLINE int  cliRBChoseIdx(STrans* pTransInst);
dengyihao's avatar
dengyihao 已提交
221
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
222

dengyihao's avatar
dengyihao 已提交
223
// thread obj
dengyihao's avatar
dengyihao 已提交
224
static SCliThrd* createThrdObj(void* trans);
dengyihao's avatar
dengyihao 已提交
225
static void      destroyThrdObj(SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
226

dengyihao's avatar
dengyihao 已提交
227 228 229 230 231 232 233 234 235
static void cliWalkCb(uv_handle_t* handle, void* arg);

#define CLI_RELEASE_UV(loop)        \
  do {                              \
    uv_walk(loop, cliWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);   \
    uv_loop_close(loop);            \
  } while (0);

dengyihao's avatar
dengyihao 已提交
236
// snprintf may cause performance problem
dengyihao's avatar
dengyihao 已提交
237 238
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
  do {                                         \
dengyihao's avatar
dengyihao 已提交
239 240
    char*   t = key;                           \
    int16_t len = strlen(ip);                  \
dengyihao's avatar
dengyihao 已提交
241
    if (ip != NULL) memcpy(t, ip, len);        \
dengyihao's avatar
dengyihao 已提交
242 243
    t[len] = ':';                              \
    titoa(port, 10, &t[len + 1]);              \
dengyihao's avatar
dengyihao 已提交
244 245
  } while (0)

dengyihao's avatar
dengyihao 已提交
246 247
#define CONN_PERSIST_TIME(para)   ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
dengyihao's avatar
dengyihao 已提交
248

dengyihao's avatar
dengyihao 已提交
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle)                         \
  do {                                                                    \
    int i = 0, sz = transQueueSize(&conn->cliMsgs);                       \
    for (; i < sz; i++) {                                                 \
      pMsg = transQueueGet(&conn->cliMsgs, i);                            \
      if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
        break;                                                            \
      }                                                                   \
    }                                                                     \
    if (i == sz) {                                                        \
      pMsg = NULL;                                                        \
      tDebug("msg not found, %" PRIu64 "", ahandle);                      \
    } else {                                                              \
      pMsg = transQueueRm(&conn->cliMsgs, i);                             \
      tDebug("msg found, %" PRIu64 "", ahandle);                          \
    }                                                                     \
dengyihao's avatar
dengyihao 已提交
265
  } while (0)
dengyihao's avatar
dengyihao 已提交
266

U
ubuntu 已提交
267 268 269 270 271 272 273 274 275 276 277 278 279
#define CONN_GET_NEXT_SENDMSG(conn)                 \
  do {                                              \
    int i = 0;                                      \
    do {                                            \
      pCliMsg = transQueueGet(&conn->cliMsgs, i++); \
      if (pCliMsg && 0 == pCliMsg->sent) {          \
        break;                                      \
      }                                             \
    } while (pCliMsg != NULL);                      \
    if (pCliMsg == NULL) {                          \
      goto _RETURN;                                 \
    }                                               \
  } while (0)
dengyihao's avatar
dengyihao 已提交
280

dengyihao's avatar
formate  
dengyihao 已提交
281 282
#define CONN_SET_PERSIST_BY_APP(conn) \
  do {                                \
dengyihao's avatar
dengyihao 已提交
283 284
    if (conn->status == ConnNormal) { \
      conn->status = ConnAcquire;     \
dengyihao's avatar
formate  
dengyihao 已提交
285 286 287
      transRefCliHandle(conn);        \
    }                                 \
  } while (0)
288

dengyihao's avatar
dengyihao 已提交
289 290 291 292
#define CONN_NO_PERSIST_BY_APP(conn) \
  (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) \
  (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
293

S
Shengliang Guan 已提交
294 295
#define REQUEST_NO_RESP(msg)         ((msg)->info.noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg)   ((msg)->info.persistHandle == 1)
dengyihao's avatar
dengyihao 已提交
296
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
dengyihao's avatar
dengyihao 已提交
297

dengyihao's avatar
dengyihao 已提交
298
#define EPSET_IS_VALID(epSet)       ((epSet) != NULL && (epSet)->numOfEps >= 0 && (epSet)->inUse >= 0)
dengyihao's avatar
dengyihao 已提交
299
#define EPSET_GET_SIZE(epSet)       (epSet)->numOfEps
dengyihao's avatar
dengyihao 已提交
300
#define EPSET_GET_INUSE_IP(epSet)   ((epSet)->eps[(epSet)->inUse].fqdn)
dengyihao's avatar
dengyihao 已提交
301
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
dengyihao's avatar
dengyihao 已提交
302 303 304 305 306 307
#define EPSET_FORWARD_INUSE(epSet)                             \
  do {                                                         \
    if ((epSet)->numOfEps != 0) {                              \
      ++((epSet)->inUse);                                      \
      (epSet)->inUse = ((epSet)->inUse) % ((epSet)->numOfEps); \
    }                                                          \
dengyihao's avatar
dengyihao 已提交
308
  } while (0)
dengyihao's avatar
dengyihao 已提交
309

dengyihao's avatar
dengyihao 已提交
310 311 312 313 314 315 316 317 318 319 320
#define EPSET_DEBUG_STR(epSet, tbuf)                                                                                   \
  do {                                                                                                                 \
    int len = snprintf(tbuf, sizeof(tbuf), "epset:{");                                                                 \
    for (int i = 0; i < (epSet)->numOfEps; i++) {                                                                      \
      if (i == (epSet)->numOfEps - 1) {                                                                                \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port);   \
      } else {                                                                                                         \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
      }                                                                                                                \
    }                                                                                                                  \
    len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse);                                    \
dengyihao's avatar
dengyihao 已提交
321
  } while (0);
dengyihao's avatar
dengyihao 已提交
322

U
ubuntu 已提交
323
static void* cliWorkThread(void* arg);
dengyihao's avatar
dengyihao 已提交
324

dengyihao's avatar
dengyihao 已提交
325 326 327 328 329 330 331 332
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
    if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
      if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
        conn->ctx.freeFunc(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
333
      } else if (msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) {
dengyihao's avatar
dengyihao 已提交
334
        tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
335
        pThrd->destroyAhandleFp(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
336 337 338 339
      }
    }
    destroyCmsg(msg);
  }
dengyihao's avatar
dengyihao 已提交
340
  transQueueClear(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
341
  memset(&conn->ctx, 0, sizeof(conn->ctx));
dengyihao's avatar
dengyihao 已提交
342
}
dengyihao's avatar
dengyihao 已提交
343
bool cliMaySendCachedMsg(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
344
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
345
    SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
346
    CONN_GET_NEXT_SENDMSG(conn);
dengyihao's avatar
dengyihao 已提交
347 348
    cliSend(conn);
    return true;
dengyihao's avatar
dengyihao 已提交
349
  }
dengyihao's avatar
dengyihao 已提交
350
  return false;
U
ubuntu 已提交
351 352
_RETURN:
  return false;
dengyihao's avatar
dengyihao 已提交
353
}
dengyihao's avatar
formate  
dengyihao 已提交
354
void cliHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
355 356
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
357

dengyihao's avatar
dengyihao 已提交
358 359 360 361 362 363
  if (conn->timer) {
    if (uv_is_active((uv_handle_t*)conn->timer)) {
      tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
      uv_timer_stop(conn->timer);
    }
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
364
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
365
    conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
366 367
  }

dengyihao's avatar
opt rpc  
dengyihao 已提交
368
  STransMsgHead* pHead = NULL;
dengyihao's avatar
dengyihao 已提交
369 370 371

  int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
  if (msgLen <= 0) {
dengyihao's avatar
dengyihao 已提交
372
    taosMemoryFree(pHead);
dengyihao's avatar
dengyihao 已提交
373 374 375
    tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
376 377 378 379

  if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
    tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
  }
dengyihao's avatar
dengyihao 已提交
380 381
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
382 383 384 385
  if (cliRecvReleaseReq(conn, pHead)) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
386 387 388 389 390
  STransMsg transMsg = {0};
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = transContFromHead((char*)pHead);
  transMsg.code = pHead->code;
  transMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
391
  transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
392
  transMsg.info.traceId = pHead->traceId;
dengyihao's avatar
dengyihao 已提交
393
  transMsg.info.hasEpSet = pHead->hasEpSet;
dengyihao's avatar
dengyihao 已提交
394

dengyihao's avatar
dengyihao 已提交
395 396 397 398
  SCliMsg*       pMsg = NULL;
  STransConnCtx* pCtx = NULL;
  if (CONN_NO_PERSIST_BY_APP(conn)) {
    pMsg = transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
399 400 401

    pCtx = pMsg ? pMsg->ctx : NULL;
    transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
402
    tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
U
ubuntu 已提交
403
  } else {
dengyihao's avatar
dengyihao 已提交
404 405 406
    uint64_t ahandle = (uint64_t)pHead->ahandle;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
    if (pMsg == NULL) {
S
Shengliang Guan 已提交
407
      transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
408 409
      tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
             transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
410
      if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
D
dapan1121 已提交
411
        transMsg.code = TSDB_CODE_RPC_BROKEN_LINK;
S
Shengliang Guan 已提交
412
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
dengyihao's avatar
dengyihao 已提交
413 414
        tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
415 416
      }
    } else {
dengyihao's avatar
dengyihao 已提交
417
      pCtx = pMsg->ctx;
S
Shengliang Guan 已提交
418
      transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
419
      tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
420
    }
U
ubuntu 已提交
421
  }
dengyihao's avatar
dengyihao 已提交
422
  // buf's mem alread translated to transMsg.pCont
dengyihao's avatar
dengyihao 已提交
423
  if (!CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
424
    transMsg.info.handle = (void*)conn->refId;
dengyihao's avatar
dengyihao 已提交
425
    tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
426
  }
dengyihao's avatar
dengyihao 已提交
427

dengyihao's avatar
dengyihao 已提交
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
  // if (TMSG_INFO(pHead->msgType - 1) != 0) {
  //   char buf[128] = {0};
  //   sprintf(buf, "%s", TMSG_INFO(pHead->msgType - 1));
  //   int* count = taosHashGet(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)));
  //   if (NULL == 0) {
  //     int localCount = 1;
  //     taosHashPut(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)), &localCount,
  //                 sizeof(localCount));
  //   } else {
  //     int localCount = *count - 1;
  //     taosHashPut(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)), &localCount,
  //                 sizeof(localCount));
  //   }
  // }

dengyihao's avatar
dengyihao 已提交
443
  STraceId* trace = &transMsg.info.traceId;
dengyihao's avatar
dengyihao 已提交
444
  tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
dengyihao's avatar
dengyihao 已提交
445
          TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
dengyihao's avatar
dengyihao 已提交
446

dengyihao's avatar
dengyihao 已提交
447
  if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
448
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
449
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
450 451
    return;
  }
S
Shengliang Guan 已提交
452
  if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
453
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
454
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
455 456
    return;
  }
dengyihao's avatar
dengyihao 已提交
457

dengyihao's avatar
dengyihao 已提交
458
  if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
459 460 461
    if (cliAppCb(conn, &transMsg, pMsg) != 0) {
      return;
    }
dengyihao's avatar
dengyihao 已提交
462
  }
dengyihao's avatar
dengyihao 已提交
463 464
  destroyCmsg(pMsg);

dengyihao's avatar
dengyihao 已提交
465
  if (cliMaySendCachedMsg(conn) == true) {
dengyihao's avatar
dengyihao 已提交
466 467
    return;
  }
dengyihao's avatar
dengyihao 已提交
468

U
ubuntu 已提交
469
  if (CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
470
    return addConnToPool(pThrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
471
  }
dengyihao's avatar
test  
dengyihao 已提交
472

dengyihao's avatar
dengyihao 已提交
473
  uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
474
}
U
ubuntu 已提交
475

dengyihao's avatar
dengyihao 已提交
476
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
dengyihao's avatar
dengyihao 已提交
477
  if (transQueueEmpty(&pConn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
478
    if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
dengyihao's avatar
dengyihao 已提交
479
      tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
formate  
dengyihao 已提交
480 481 482
      transUnrefCliHandle(pConn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
483
  }
dengyihao's avatar
dengyihao 已提交
484 485 486
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
  bool      once = false;
D
dapan1121 已提交
487
  do {
dengyihao's avatar
dengyihao 已提交
488
    SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
dengyihao's avatar
enh log  
dengyihao 已提交
489

D
dapan1121 已提交
490 491
    if (pMsg == NULL && once) {
      break;
dengyihao's avatar
dengyihao 已提交
492
    }
dengyihao's avatar
enh log  
dengyihao 已提交
493 494 495 496 497 498

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg)) {
      destroyCmsg(pMsg);
      break;
    }

dengyihao's avatar
dengyihao 已提交
499
    STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
U
ubuntu 已提交
500

dengyihao's avatar
dengyihao 已提交
501
    STransMsg transMsg = {0};
dengyihao's avatar
dengyihao 已提交
502
    transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
dengyihao's avatar
dengyihao 已提交
503
    transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
S
Shengliang Guan 已提交
504
    transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
505

dengyihao's avatar
dengyihao 已提交
506
    if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
S
Shengliang Guan 已提交
507
      transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
508
      tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
dengyihao's avatar
dengyihao 已提交
509
             TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
510
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
511 512 513
        int32_t msgType = 0;
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType);
        transMsg.msgType = msgType;
dengyihao's avatar
dengyihao 已提交
514
        tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
S
Shengliang Guan 已提交
515
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
516
      }
dengyihao's avatar
dengyihao 已提交
517
    } else {
dengyihao's avatar
dengyihao 已提交
518
      transMsg.info.ahandle = (pMsg != NULL && pMsg->type != Release && pCtx) ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
519 520 521
    }

    if (pCtx == NULL || pCtx->pSem == NULL) {
S
Shengliang Guan 已提交
522
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
523 524 525 526 527
        if (pMsg == NULL || REQUEST_NO_RESP(&pMsg->msg) || pMsg->type == Release) {
          destroyCmsg(pMsg);
          once = true;
          continue;
        }
U
ubuntu 已提交
528
      }
dengyihao's avatar
dengyihao 已提交
529
    }
dengyihao's avatar
enh log  
dengyihao 已提交
530

dengyihao's avatar
dengyihao 已提交
531
    if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
532 533 534
      if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
        return;
      }
dengyihao's avatar
dengyihao 已提交
535 536
    }
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
537
    tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn));
D
dapan1121 已提交
538
  } while (!transQueueEmpty(&pConn->cliMsgs));
dengyihao's avatar
dengyihao 已提交
539
  transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
540
}
dengyihao's avatar
dengyihao 已提交
541
void cliHandleExcept(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
542
  tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
543 544
  cliHandleExceptImpl(conn, -1);
}
dengyihao's avatar
dengyihao 已提交
545

dengyihao's avatar
dengyihao 已提交
546 547
void cliConnTimeout(uv_timer_t* handle) {
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
548 549
  SCliThrd* pThrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
550
  tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
551

dengyihao's avatar
dengyihao 已提交
552
  uv_timer_stop(handle);
dengyihao's avatar
dengyihao 已提交
553 554 555
  handle->data = NULL;
  taosArrayPush(pThrd->timerList, &conn->timer);
  conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
556 557

  cliHandleFastFail(conn, UV_ECANCELED);
dengyihao's avatar
dengyihao 已提交
558
}
dengyihao's avatar
dengyihao 已提交
559 560 561 562
void cliReadTimeoutCb(uv_timer_t* handle) {
  // set up timeout cb
  SCliConn* conn = handle->data;
  tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
563
  uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
564 565
  cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
}
U
ubuntu 已提交
566 567

void* createConnPool(int size) {
568 569
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
570
}
dengyihao's avatar
dengyihao 已提交
571 572
void* destroyConnPool(SCliThrd* pThrd) {
  void*      pool = pThrd->pool;
dengyihao's avatar
dengyihao 已提交
573
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
574
  while (connList != NULL) {
dengyihao's avatar
dengyihao 已提交
575 576
    while (!QUEUE_IS_EMPTY(&connList->conns)) {
      queue*    h = QUEUE_HEAD(&connList->conns);
dengyihao's avatar
dengyihao 已提交
577
      SCliConn* c = QUEUE_DATA(h, SCliConn, q);
U
ubuntu 已提交
578
      cliDestroyConn(c, true);
dengyihao's avatar
dengyihao 已提交
579
    }
dengyihao's avatar
dengyihao 已提交
580 581 582 583 584 585 586 587 588 589 590 591 592 593 594

    SMsgList* msglist = connList->list;
    while (!QUEUE_IS_EMPTY(&msglist->msgQ)) {
      queue* h = QUEUE_HEAD(&msglist->msgQ);
      QUEUE_REMOVE(h);

      SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);

      transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
      pMsg->ctx->task = NULL;

      doNotifyApp(pMsg, pThrd);
    }
    taosMemoryFree(msglist);

dengyihao's avatar
dengyihao 已提交
595
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
596
  }
dengyihao's avatar
dengyihao 已提交
597
  taosHashCleanup(pool);
598
  return NULL;
dengyihao's avatar
dengyihao 已提交
599 600
}

dengyihao's avatar
dengyihao 已提交
601
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
dengyihao's avatar
dengyihao 已提交
602
  void*      pool = pThrd->pool;
dengyihao's avatar
dengyihao 已提交
603
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
604
  STrans*    pTranInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
605
  if (plist == NULL) {
dengyihao's avatar
dengyihao 已提交
606 607 608 609
    SConnList list = {0};
    taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pool, key, strlen(key));

dengyihao's avatar
dengyihao 已提交
610 611 612 613
    SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
    QUEUE_INIT(&nList->msgQ);
    nList->numOfConn++;

dengyihao's avatar
dengyihao 已提交
614 615
    QUEUE_INIT(&plist->conns);
    plist->list = nList;
dengyihao's avatar
dengyihao 已提交
616 617
  }

dengyihao's avatar
dengyihao 已提交
618 619 620 621
  if (QUEUE_IS_EMPTY(&plist->conns)) {
    if (plist->list->numOfConn >= pTranInst->connLimitNum) {
      *exceed = true;
    }
dengyihao's avatar
dengyihao 已提交
622 623 624
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
625
  queue* h = QUEUE_TAIL(&plist->conns);
dengyihao's avatar
dengyihao 已提交
626
  QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
627
  plist->size -= 1;
dengyihao's avatar
dengyihao 已提交
628

dengyihao's avatar
dengyihao 已提交
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
  conn->status = ConnNormal;
  QUEUE_INIT(&conn->q);

  if (conn->task != NULL) {
    transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
    conn->task = NULL;
  }
  return conn;
}

static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
  void*      pool = pThrd->pool;
  STrans*    pTransInst = pThrd->pTransInst;
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
  if (plist == NULL) {
dengyihao's avatar
dengyihao 已提交
645 646 647 648
    SConnList list = {0};
    taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pool, key, strlen(key));

dengyihao's avatar
dengyihao 已提交
649 650
    SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
    QUEUE_INIT(&nList->msgQ);
dengyihao's avatar
dengyihao 已提交
651 652
    nList->numOfConn++;

dengyihao's avatar
dengyihao 已提交
653 654
    QUEUE_INIT(&plist->conns);
    plist->list = nList;
dengyihao's avatar
dengyihao 已提交
655 656
  }

dengyihao's avatar
dengyihao 已提交
657
  STraceId* trace = &(*pMsg)->msg.info.traceId;
dengyihao's avatar
dengyihao 已提交
658
  // no avaliable conn in pool
dengyihao's avatar
dengyihao 已提交
659
  if (QUEUE_IS_EMPTY(&plist->conns)) {
dengyihao's avatar
dengyihao 已提交
660
    SMsgList* list = plist->list;
dengyihao's avatar
dengyihao 已提交
661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678
    if ((list)->numOfConn >= pTransInst->connLimitNum) {
      STraceId* trace = &(*pMsg)->msg.info.traceId;
      STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
      arg->param1 = *pMsg;
      arg->param2 = pThrd;
      (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);

      tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));

      QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
      *pMsg = NULL;
    } else {
      // send msg in delay queue
      if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
        STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
        arg->param1 = *pMsg;
        arg->param2 = pThrd;
        (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
dengyihao's avatar
dengyihao 已提交
679 680
        tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label,
                TMSG_INFO((*pMsg)->msg.msgType));
dengyihao's avatar
dengyihao 已提交
681 682 683 684 685 686 687

        QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
        queue* h = QUEUE_HEAD(&(list)->msgQ);
        QUEUE_REMOVE(h);
        SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q);

        *pMsg = ans;
dengyihao's avatar
dengyihao 已提交
688 689 690

        trace = &(*pMsg)->msg.info.traceId;
        tGTrace("%s msg %s pop from delay queue, start to send", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
dengyihao's avatar
dengyihao 已提交
691 692 693 694
        transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
      }
      list->numOfConn++;
    }
dengyihao's avatar
dengyihao 已提交
695
    tTrace("%s numOfConn: %d, limit: %d", pTransInst->label, list->numOfConn, pTransInst->connLimitNum);
dengyihao's avatar
dengyihao 已提交
696 697
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
698

dengyihao's avatar
dengyihao 已提交
699
  queue* h = QUEUE_TAIL(&plist->conns);
dengyihao's avatar
dengyihao 已提交
700
  plist->size -= 1;
dengyihao's avatar
dengyihao 已提交
701 702
  QUEUE_REMOVE(h);

dengyihao's avatar
dengyihao 已提交
703
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
dengyihao's avatar
dengyihao 已提交
704
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
705
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
706

dengyihao's avatar
dengyihao 已提交
707 708 709 710
  if (conn->task != NULL) {
    transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
    conn->task = NULL;
  }
dengyihao's avatar
dengyihao 已提交
711
  return conn;
dengyihao's avatar
dengyihao 已提交
712
}
dengyihao's avatar
dengyihao 已提交
713 714 715 716 717 718
static void addConnToPool(void* pool, SCliConn* conn) {
  if (conn->status == ConnInPool) {
    return;
  }
  allocConnRef(conn, true);

dengyihao's avatar
dengyihao 已提交
719
  SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
720 721 722 723 724 725
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    taosArrayPush(thrd->timerList, &conn->timer);
    conn->timer->data = NULL;
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
726 727 728 729 730
  if (T_REF_VAL_GET(conn) > 1) {
    transUnrefCliHandle(conn);
  }

  cliDestroyConnMsgs(conn, false);
dengyihao's avatar
dengyihao 已提交
731

dengyihao's avatar
dengyihao 已提交
732
  if (conn->list == NULL) {
dengyihao's avatar
dengyihao 已提交
733
    conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
dengyihao's avatar
dengyihao 已提交
734
  }
dengyihao's avatar
dengyihao 已提交
735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755

  SConnList* pList = conn->list;
  SMsgList*  msgList = pList->list;
  if (!QUEUE_IS_EMPTY(&msgList->msgQ)) {
    queue* h = QUEUE_HEAD(&(msgList)->msgQ);
    QUEUE_REMOVE(h);

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);

    transDQCancel(thrd->waitConnQueue, pMsg->ctx->task);
    pMsg->ctx->task = NULL;

    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
    transQueuePush(&conn->cliMsgs, pMsg);

    conn->status = ConnNormal;
    cliSend(conn);
    return;
  }

  conn->status = ConnInPool;
dengyihao's avatar
dengyihao 已提交
756
  QUEUE_PUSH(&conn->list->conns, &conn->q);
dengyihao's avatar
dengyihao 已提交
757
  conn->list->size += 1;
dengyihao's avatar
dengyihao 已提交
758

dengyihao's avatar
dengyihao 已提交
759
  if (conn->list->size >= 20) {
dengyihao's avatar
dengyihao 已提交
760 761
    STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
    arg->param1 = conn;
dengyihao's avatar
dengyihao 已提交
762
    arg->param2 = thrd;
dengyihao's avatar
dengyihao 已提交
763 764

    STrans* pTransInst = thrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
765 766
    conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
  }
dengyihao's avatar
dengyihao 已提交
767
}
dengyihao's avatar
dengyihao 已提交
768
static int32_t allocConnRef(SCliConn* conn, bool update) {
dengyihao's avatar
dengyihao 已提交
769
  if (update) {
dengyihao's avatar
dengyihao 已提交
770
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
771
    transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
772
    conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
773 774 775 776
  }
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
777
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
dengyihao's avatar
dengyihao 已提交
778
  conn->refId = exh->refId;
779 780 781 782

  if (conn->refId == -1) {
    taosMemoryFree(exh);
  }
dengyihao's avatar
dengyihao 已提交
783 784 785 786 787
  return 0;
}

static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
  if (update) {
dengyihao's avatar
dengyihao 已提交
788
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
789 790 791 792 793 794 795 796 797 798 799 800 801
    transRemoveExHandle(transGetRefMgt(), conn->refId);
    conn->refId = -1;
  }
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
  if (exh == NULL) {
    return -1;
  }
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
  conn->refId = exh->refId;

  transReleaseExHandle(transGetRefMgt(), handle);
  return 0;
dengyihao's avatar
dengyihao 已提交
802
}
dengyihao's avatar
dengyihao 已提交
803

dengyihao's avatar
dengyihao 已提交
804
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
805
  SCliConn*    conn = handle->data;
dengyihao's avatar
dengyihao 已提交
806
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
807
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
808
}
U
ubuntu 已提交
809
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
810
  // impl later
dengyihao's avatar
dengyihao 已提交
811 812 813
  if (handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
814 815
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
816
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
817
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
818
    while (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
819
      tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
820 821 822 823 824 825
      if (pBuf->invalid) {
        cliHandleExcept(conn);
        break;
      } else {
        cliHandleResp(conn);
      }
dengyihao's avatar
dengyihao 已提交
826
    }
dengyihao's avatar
dengyihao 已提交
827 828
    return;
  }
dengyihao's avatar
dengyihao 已提交
829

830
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
831 832 833
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
dengyihao's avatar
dengyihao 已提交
834
    tTrace("%s conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
835 836
    return;
  }
dengyihao's avatar
fix bug  
dengyihao 已提交
837
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
838
    tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
839
    conn->broken = true;
U
ubuntu 已提交
840
    cliHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
841
  }
dengyihao's avatar
dengyihao 已提交
842
}
dengyihao's avatar
dengyihao 已提交
843

dengyihao's avatar
dengyihao 已提交
844
static SCliConn* cliCreateConn(SCliThrd* pThrd) {
wafwerar's avatar
wafwerar 已提交
845
  SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
846
  // read/write stream handle
G
gccgdb1234 已提交
847
  conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
848 849 850
  uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
  conn->stream->data = conn;

dengyihao's avatar
dengyihao 已提交
851 852 853 854 855 856 857 858
  uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
  if (timer == NULL) {
    timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    tDebug("no available timer, create a timer %p", timer);
    uv_timer_init(pThrd->loop, timer);
  }
  timer->data = conn;
  conn->timer = timer;
dengyihao's avatar
dengyihao 已提交
859

dengyihao's avatar
dengyihao 已提交
860
  conn->connReq.data = conn;
dengyihao's avatar
dengyihao 已提交
861 862
  transReqQueueInit(&conn->wreqQueue);

dengyihao's avatar
dengyihao 已提交
863
  transQueueInit(&conn->cliMsgs, NULL);
dengyihao's avatar
opt rpc  
dengyihao 已提交
864 865

  transInitBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
866
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
867
  conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
868
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
869
  conn->broken = false;
dengyihao's avatar
dengyihao 已提交
870
  transRefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
871

dengyihao's avatar
dengyihao 已提交
872
  atomic_add_fetch_32(&pThrd->connCount, 1);
dengyihao's avatar
dengyihao 已提交
873
  allocConnRef(conn, false);
dengyihao's avatar
dengyihao 已提交
874

dengyihao's avatar
dengyihao 已提交
875 876
  return conn;
}
U
ubuntu 已提交
877
static void cliDestroyConn(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
878
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
879
  tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
880
  conn->broken = true;
dengyihao's avatar
dengyihao 已提交
881 882
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
883 884 885 886

  if (conn->list != NULL) {
    SConnList* connList = conn->list;
    connList->list->numOfConn--;
dengyihao's avatar
dengyihao 已提交
887
    connList->size--;
dengyihao's avatar
dengyihao 已提交
888 889 890 891 892 893
  } else {
    SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
    connList->list->numOfConn--;
  }
  conn->list = NULL;

dengyihao's avatar
dengyihao 已提交
894
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
895
  transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
opt rpc  
dengyihao 已提交
896
  conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
897

dengyihao's avatar
dengyihao 已提交
898 899 900 901 902 903 904
  if (conn->task != NULL) {
    transDQCancel(pThrd->timeoutQueue, conn->task);
    conn->task = NULL;
  }
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
905
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
906 907
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
908

dengyihao's avatar
dengyihao 已提交
909
  if (clear) {
dengyihao's avatar
dengyihao 已提交
910
    if (!uv_is_closing((uv_handle_t*)conn->stream)) {
dengyihao's avatar
dengyihao 已提交
911
      uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
912 913
      uv_close((uv_handle_t*)conn->stream, cliDestroy);
    }
914
  }
dengyihao's avatar
dengyihao 已提交
915
}
U
ubuntu 已提交
916
static void cliDestroy(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
917 918 919
  if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
920
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
921
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
922 923
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
dengyihao's avatar
dengyihao 已提交
924 925
    taosArrayPush(pThrd->timerList, &conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
926 927
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
928

dengyihao's avatar
dengyihao 已提交
929 930
  atomic_sub_fetch_32(&pThrd->connCount, 1);

dengyihao's avatar
dengyihao 已提交
931
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
932
  transRemoveExHandle(transGetRefMgt(), conn->refId);
wafwerar's avatar
wafwerar 已提交
933 934
  taosMemoryFree(conn->ip);
  taosMemoryFree(conn->stream);
dengyihao's avatar
dengyihao 已提交
935 936 937

  cliDestroyConnMsgs(conn, true);

dengyihao's avatar
dengyihao 已提交
938
  tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
939
  transReqQueueClear(&conn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
940
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
941

wafwerar's avatar
wafwerar 已提交
942
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
943
}
dengyihao's avatar
dengyihao 已提交
944
static bool cliHandleNoResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
945 946
  bool res = false;
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
947
    SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0);
dengyihao's avatar
dengyihao 已提交
948
    if (REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
949
      transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
950 951 952 953 954
      destroyCmsg(pMsg);
      res = true;
    }
    if (res == true) {
      if (cliMaySendCachedMsg(conn) == false) {
dengyihao's avatar
dengyihao 已提交
955
        SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
956
        addConnToPool(thrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
957 958 959
        res = false;
      } else {
        res = true;
dengyihao's avatar
dengyihao 已提交
960 961 962 963 964
      }
    }
  }
  return res;
}
U
ubuntu 已提交
965
static void cliSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
966 967
  SCliConn* pConn = transReqQueueRemove(req);
  if (pConn == NULL) return;
dengyihao's avatar
dengyihao 已提交
968

dengyihao's avatar
dengyihao 已提交
969 970 971
  SCliMsg* pMsg = !transQueueEmpty(&pConn->cliMsgs) ? transQueueGet(&pConn->cliMsgs, 0) : NULL;
  if (pMsg != NULL) {
    int64_t cost = taosGetTimestampUs() - pMsg->st;
dengyihao's avatar
dengyihao 已提交
972
    if (cost > 1000 * 20) {
dengyihao's avatar
dengyihao 已提交
973 974 975 976
      tWarn("%s conn %p send cost:%dus, send exception", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
    }
  }

dengyihao's avatar
dengyihao 已提交
977
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
978
    tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
979
  } else {
dengyihao's avatar
dengyihao 已提交
980 981 982 983
    if (!uv_is_closing((uv_handle_t*)&pConn->stream)) {
      tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
      cliHandleExcept(pConn);
    }
dengyihao's avatar
dengyihao 已提交
984 985
    return;
  }
dengyihao's avatar
dengyihao 已提交
986
  if (cliHandleNoResp(pConn) == true) {
dengyihao's avatar
dengyihao 已提交
987
    tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
988 989
    return;
  }
dengyihao's avatar
dengyihao 已提交
990
  uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
991
}
dengyihao's avatar
dengyihao 已提交
992 993 994 995
void cliSendBatch(SCliConn* pConn) {
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

dengyihao's avatar
dengyihao 已提交
996 997 998 999 1000
  SCliBatch*     pBatch = pConn->pBatch;
  SCliBatchList* pList = pBatch->pList;
  pList->connCnt += 1;

  int32_t wLen = pBatch->wLen;
dengyihao's avatar
dengyihao 已提交
1001 1002 1003 1004

  uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
  int       i = 0;

dengyihao's avatar
dengyihao 已提交
1005 1006
  queue* h = NULL;
  QUEUE_FOREACH(h, &pBatch->wq) {
dengyihao's avatar
dengyihao 已提交
1007 1008 1009 1010 1011 1012 1013 1014 1015
    SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q);

    STransConnCtx* pCtx = pCliMsg->ctx;

    STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
    if (pMsg->pCont == 0) {
      pMsg->pCont = (void*)rpcMallocCont(0);
      pMsg->contLen = 0;
    }
dengyihao's avatar
dengyihao 已提交
1016

dengyihao's avatar
dengyihao 已提交
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032
    int            msgLen = transMsgLenFromCont(pMsg->contLen);
    STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);

    if (pHead->comp == 0) {
      pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
      pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
      pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
      pHead->msgType = pMsg->msgType;
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
      memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
      pHead->traceId = pMsg->info.traceId;
      pHead->magicNum = htonl(TRANS_MAGIC_NUM);
    }
    pHead->timestamp = taosHton64(taosGetTimestampUs());

dengyihao's avatar
dengyihao 已提交
1033 1034 1035 1036 1037 1038 1039 1040
    if (pHead->comp == 0) {
      if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
        msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
        pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      }
    } else {
      msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
    }
dengyihao's avatar
dengyihao 已提交
1041 1042 1043 1044 1045
    wb[i++] = uv_buf_init((char*)pHead, msgLen);
  }

  uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
  req->data = pConn;
dengyihao's avatar
dengyihao 已提交
1046
  tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn,
dengyihao's avatar
dengyihao 已提交
1047
         pBatch->wLen, pBatch->batchSize);
dengyihao's avatar
dengyihao 已提交
1048 1049 1050
  uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
  taosMemoryFree(wb);
}
U
ubuntu 已提交
1051
void cliSend(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
1052 1053 1054 1055 1056
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  if (transQueueEmpty(&pConn->cliMsgs)) {
    tError("%s conn %p not msg to send", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
1057
    cliHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
1058 1059
    return;
  }
dengyihao's avatar
dengyihao 已提交
1060 1061

  SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
1062
  CONN_GET_NEXT_SENDMSG(pConn);
dengyihao's avatar
dengyihao 已提交
1063 1064
  pCliMsg->sent = 1;

dengyihao's avatar
dengyihao 已提交
1065
  STransConnCtx* pCtx = pCliMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1066

U
ubuntu 已提交
1067
  STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
dengyihao's avatar
dengyihao 已提交
1068 1069 1070 1071
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
dengyihao's avatar
dengyihao 已提交
1072

dengyihao's avatar
dengyihao 已提交
1073
  int            msgLen = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
1074
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
1075

dengyihao's avatar
dengyihao 已提交
1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
  if (pHead->comp == 0) {
    pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
    pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
    pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
    pHead->msgType = pMsg->msgType;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
    memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
    pHead->traceId = pMsg->info.traceId;
    pHead->magicNum = htonl(TRANS_MAGIC_NUM);
  }
dengyihao's avatar
dengyihao 已提交
1087
  pHead->timestamp = taosHton64(taosGetTimestampUs());
dengyihao's avatar
dengyihao 已提交
1088

dengyihao's avatar
dengyihao 已提交
1089 1090 1091
  if (pHead->persist == 1) {
    CONN_SET_PERSIST_BY_APP(pConn);
  }
dengyihao's avatar
dengyihao 已提交
1092

dengyihao's avatar
dengyihao 已提交
1093
  STraceId* trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
1094

dengyihao's avatar
dengyihao 已提交
1095
  if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
dengyihao's avatar
dengyihao 已提交
1096
    uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
dengyihao's avatar
dengyihao 已提交
1097 1098
    if (timer == NULL) {
      timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
dengyihao's avatar
dengyihao 已提交
1099
      tDebug("no available timer, create a timer %p", timer);
dengyihao's avatar
dengyihao 已提交
1100 1101 1102 1103 1104
      uv_timer_init(pThrd->loop, timer);
    }
    timer->data = pConn;
    pConn->timer = timer;

dengyihao's avatar
dengyihao 已提交
1105 1106 1107
    tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
    uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
  }
dengyihao's avatar
dengyihao 已提交
1108

dengyihao's avatar
dengyihao 已提交
1109 1110 1111 1112 1113 1114 1115
  if (pHead->comp == 0) {
    if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
      msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    }
  } else {
    msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
dengyihao's avatar
dengyihao 已提交
1116
  }
dengyihao's avatar
dengyihao 已提交
1117

dengyihao's avatar
dengyihao 已提交
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
  // if (tmsgIsValid(pHead->msgType)) {
  //   char buf[128] = {0};
  //   sprintf(buf, "%s", TMSG_INFO(pHead->msgType));
  //   int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
  //   if (NULL == 0) {
  //     int localCount = 1;
  //     taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
  //   } else {
  //     int localCount = *count + 1;
  //     taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
  //   }
  // }
dengyihao's avatar
dengyihao 已提交
1130

dengyihao's avatar
dengyihao 已提交
1131 1132
  tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
          TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
dengyihao's avatar
dengyihao 已提交
1133

dengyihao's avatar
dengyihao 已提交
1134
  uv_buf_t    wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
1135
  uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
1136 1137 1138

  int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1139
    tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
dengyihao's avatar
dengyihao 已提交
1140 1141 1142
            uv_err_name(status));
    cliHandleExcept(pConn);
  }
U
ubuntu 已提交
1143 1144
  return;
_RETURN:
dengyihao's avatar
dengyihao 已提交
1145
  return;
dengyihao's avatar
dengyihao 已提交
1146
}
dengyihao's avatar
dengyihao 已提交
1147 1148

static void cliDestroyBatch(SCliBatch* pBatch) {
dengyihao's avatar
dengyihao 已提交
1149
  if (pBatch == NULL) return;
dengyihao's avatar
dengyihao 已提交
1150
  while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
1151 1152
    queue* h = QUEUE_HEAD(&pBatch->wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
1153

dengyihao's avatar
dengyihao 已提交
1154
    SCliMsg* p = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1155 1156
    destroyCmsg(p);
  }
dengyihao's avatar
dengyihao 已提交
1157 1158
  SCliBatchList* p = pBatch->pList;
  p->sending -= 1;
dengyihao's avatar
dengyihao 已提交
1159 1160
  taosMemoryFree(pBatch);
}
dengyihao's avatar
dengyihao 已提交
1161
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1162 1163 1164 1165 1166
  if (pThrd->quit == true) {
    cliDestroyBatch(pBatch);
    return;
  }

dengyihao's avatar
dengyihao 已提交
1167
  if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
1168 1169
    return;
  }
dengyihao's avatar
dengyihao 已提交
1170 1171
  STrans*        pTransInst = pThrd->pTransInst;
  SCliBatchList* pList = pBatch->pList;
dengyihao's avatar
dengyihao 已提交
1172

dengyihao's avatar
dengyihao 已提交
1173 1174 1175
  char key[TSDB_FQDN_LEN + 64] = {0};
  CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port);

dengyihao's avatar
dengyihao 已提交
1176 1177
  bool      exceed = false;
  SCliConn* conn = getConnFromPool(pThrd, key, &exceed);
dengyihao's avatar
dengyihao 已提交
1178

dengyihao's avatar
dengyihao 已提交
1179 1180 1181
  if (conn == NULL && exceed) {
    tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen,
           pBatch->batchSize, pTransInst->connLimitNum);
dengyihao's avatar
dengyihao 已提交
1182
    cliDestroyBatch(pBatch);
dengyihao's avatar
dengyihao 已提交
1183 1184
    return;
  }
dengyihao's avatar
dengyihao 已提交
1185 1186
  if (conn == NULL) {
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
1187
    conn->pBatch = pBatch;
1188
    conn->ip = taosStrdup(pList->dst);
dengyihao's avatar
dengyihao 已提交
1189

dengyihao's avatar
dengyihao 已提交
1190
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip);
dengyihao's avatar
dengyihao 已提交
1191 1192 1193 1194 1195 1196
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1197
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1198 1199 1200 1201 1202
      return;
    }
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1203
    addr.sin_port = (uint16_t)htons(pList->port);
dengyihao's avatar
dengyihao 已提交
1204

dengyihao's avatar
dengyihao 已提交
1205
    tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst);
dengyihao's avatar
dengyihao 已提交
1206
    pThrd->newConnCount++;
dengyihao's avatar
dengyihao 已提交
1207
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
dengyihao's avatar
dengyihao 已提交
1208
    if (fd == -1) {
dengyihao's avatar
dengyihao 已提交
1209 1210 1211
      tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
             tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1212 1213 1214 1215
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1216 1217
      tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1218 1219 1220 1221
      return;
    }
    ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1222 1223
      tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1224 1225 1226 1227 1228 1229 1230 1231 1232
      return;
    }

    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
    if (ret != 0) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
1233
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1234 1235 1236 1237 1238 1239
      return;
    }
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
    return;
  }

dengyihao's avatar
dengyihao 已提交
1240
  conn->pBatch = pBatch;
dengyihao's avatar
dengyihao 已提交
1241
  cliSendBatch(conn);
dengyihao's avatar
dengyihao 已提交
1242 1243
}
static void cliSendBatchCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
1244 1245 1246
  SCliConn*  conn = req->data;
  SCliThrd*  thrd = conn->hostThrd;
  SCliBatch* p = conn->pBatch;
dengyihao's avatar
dengyihao 已提交
1247

dengyihao's avatar
dengyihao 已提交
1248
  SCliBatchList* pBatchList = p->pList;
dengyihao's avatar
dengyihao 已提交
1249
  SCliBatch*     nxtBatch = cliGetHeadFromList(pBatchList);
dengyihao's avatar
dengyihao 已提交
1250 1251
  pBatchList->connCnt -= 1;

dengyihao's avatar
dengyihao 已提交
1252 1253 1254
  conn->pBatch = NULL;

  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1255
    tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
dengyihao's avatar
dengyihao 已提交
1256
           p->wLen, p->batchSize, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
1257 1258 1259

    if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn);

dengyihao's avatar
dengyihao 已提交
1260
    cliHandleBatchReq(nxtBatch, thrd);
dengyihao's avatar
dengyihao 已提交
1261
  } else {
dengyihao's avatar
dengyihao 已提交
1262
    tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
dengyihao's avatar
dengyihao 已提交
1263
           p->batchSize);
dengyihao's avatar
dengyihao 已提交
1264
    if (!uv_is_closing((uv_handle_t*)&conn->stream) && conn->broken == false) {
dengyihao's avatar
dengyihao 已提交
1265 1266 1267 1268 1269 1270
      if (nxtBatch != NULL) {
        conn->pBatch = nxtBatch;
        cliSendBatch(conn);
      } else {
        addConnToPool(thrd->pool, conn);
      }
dengyihao's avatar
dengyihao 已提交
1271
    } else {
dengyihao's avatar
dengyihao 已提交
1272 1273
      cliDestroyBatch(nxtBatch);
      // conn release by other callback
dengyihao's avatar
dengyihao 已提交
1274
    }
dengyihao's avatar
dengyihao 已提交
1275
  }
dengyihao's avatar
dengyihao 已提交
1276 1277 1278

  cliDestroyBatch(p);
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
1279
}
dengyihao's avatar
dengyihao 已提交
1280
static void cliHandleFastFail(SCliConn* pConn, int status) {
dengyihao's avatar
dengyihao 已提交
1281
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1282
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1283 1284 1285

  if (status == -1) status = ENETUNREACH;

dengyihao's avatar
dengyihao 已提交
1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304
  if (pConn->pBatch == NULL) {
    SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);

    STraceId* trace = &pMsg->msg.info.traceId;
    tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
            TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status));

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
        (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
      SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip));
      int64_t        cTimestamp = taosGetTimestampMs();
      if (item != NULL) {
        int32_t elapse = cTimestamp - item->timestamp;
        if (elapse >= 0 && elapse <= pTransInst->failFastInterval) {
          item->count++;
        } else {
          item->count = 1;
          item->timestamp = cTimestamp;
        }
dengyihao's avatar
dengyihao 已提交
1305
      } else {
dengyihao's avatar
dengyihao 已提交
1306 1307
        SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
        taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem));
dengyihao's avatar
dengyihao 已提交
1308 1309
      }
    }
dengyihao's avatar
dengyihao 已提交
1310
  } else {
dengyihao's avatar
dengyihao 已提交
1311 1312
    tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
           pConn, pConn->ip, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
1313 1314
    cliDestroyBatch(pConn->pBatch);
    pConn->pBatch = NULL;
dengyihao's avatar
dengyihao 已提交
1315 1316 1317
  }
  cliHandleExcept(pConn);
}
dengyihao's avatar
dengyihao 已提交
1318

dengyihao's avatar
dengyihao 已提交
1319 1320 1321
void cliConnCb(uv_connect_t* req, int status) {
  SCliConn* pConn = req->data;
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1322 1323 1324 1325 1326 1327 1328 1329 1330 1331
  bool      timeout = false;

  if (pConn->timer == NULL) {
    timeout = true;
  } else {
    uv_timer_stop(pConn->timer);
    pConn->timer->data = NULL;
    taosArrayPush(pThrd->timerList, &pConn->timer);
    pConn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
1332 1333

  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1334 1335 1336 1337 1338
    if (timeout == false) {
      cliHandleFastFail(pConn, status);
    } else if (timeout == true) {
      // already deal by timeout
    }
1339
    return;
dengyihao's avatar
dengyihao 已提交
1340
  }
dengyihao's avatar
dengyihao 已提交
1341

dengyihao's avatar
dengyihao 已提交
1342 1343
  struct sockaddr peername, sockname;
  int             addrlen = sizeof(peername);
dengyihao's avatar
dengyihao 已提交
1344
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
dengyihao's avatar
dengyihao 已提交
1345
  transSockInfo2Str(&peername, pConn->dst);
dengyihao's avatar
dengyihao 已提交
1346

dengyihao's avatar
dengyihao 已提交
1347 1348
  addrlen = sizeof(sockname);
  uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
dengyihao's avatar
dengyihao 已提交
1349
  transSockInfo2Str(&sockname, pConn->src);
dengyihao's avatar
dengyihao 已提交
1350

dengyihao's avatar
dengyihao 已提交
1351
  tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
1352 1353 1354 1355 1356
  if (pConn->pBatch != NULL) {
    cliSendBatch(pConn);
  } else {
    cliSend(pConn);
  }
dengyihao's avatar
dengyihao 已提交
1357 1358
}

dengyihao's avatar
dengyihao 已提交
1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
  STransConnCtx* pCtx = pMsg->ctx;
  STrans*        pTransInst = pThrd->pTransInst;

  STransMsg transMsg = {0};
  transMsg.contLen = 0;
  transMsg.pCont = NULL;
  transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS;
  transMsg.msgType = pMsg->msg.msgType + 1;
  transMsg.info.ahandle = pMsg->ctx->ahandle;
  transMsg.info.traceId = pMsg->msg.info.traceId;
  transMsg.info.hasEpSet = false;
  if (pCtx->pSem != NULL) {
    if (pCtx->pRsp == NULL) {
    } else {
      memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
    }
  } else {
    pTransInst->cfp(pTransInst->parent, &transMsg, NULL);
  }

  destroyCmsg(pMsg);
}
dengyihao's avatar
dengyihao 已提交
1382
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1383 1384 1385 1386 1387
  if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) {
    pThrd->stopMsg = pMsg;
    return;
  }
  pThrd->stopMsg = NULL;
dengyihao's avatar
dengyihao 已提交
1388
  pThrd->quit = true;
U
ubuntu 已提交
1389
  tDebug("cli work thread %p start to quit", pThrd);
dengyihao's avatar
dengyihao 已提交
1390
  destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1391

dengyihao's avatar
dengyihao 已提交
1392
  destroyConnPool(pThrd);
dengyihao's avatar
dengyihao 已提交
1393
  uv_walk(pThrd->loop, cliWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
1394
}
dengyihao's avatar
dengyihao 已提交
1395
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1396
  int64_t    refId = (int64_t)(pMsg->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1397
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1398
  if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1399
    tDebug("%" PRId64 " already released", refId);
dengyihao's avatar
dengyihao 已提交
1400 1401
    destroyCmsg(pMsg);
    return;
dengyihao's avatar
dengyihao 已提交
1402 1403 1404
  }

  SCliConn* conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1405
  transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1406
  tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
1407

dengyihao's avatar
dengyihao 已提交
1408 1409
  if (T_REF_VAL_GET(conn) == 2) {
    transUnrefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
1410 1411
    if (!transQueuePush(&conn->cliMsgs, pMsg)) {
      return;
dengyihao's avatar
dengyihao 已提交
1412 1413
    }
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1414 1415 1416
  } else {
    tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1417 1418
  }
}
dengyihao's avatar
dengyihao 已提交
1419
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1420
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1421
  pThrd->cvtAddr = pCtx->cvtAddr;
dengyihao's avatar
dengyihao 已提交
1422 1423
  destroyCmsg(pMsg);
}
U
ubuntu 已提交
1424

dengyihao's avatar
dengyihao 已提交
1425 1426
SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
  STransConnCtx* pCtx = (*pMsg)->ctx;
dengyihao's avatar
dengyihao 已提交
1427 1428
  SCliConn*      conn = NULL;

dengyihao's avatar
dengyihao 已提交
1429
  int64_t refId = (int64_t)((*pMsg)->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1430
  if (refId != 0) {
dengyihao's avatar
dengyihao 已提交
1431
    SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1432
    if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1433
      tError("failed to get conn, refId: %" PRId64 "", refId);
dengyihao's avatar
dengyihao 已提交
1434 1435
      *ignore = true;
      return NULL;
dengyihao's avatar
dengyihao 已提交
1436 1437
    } else {
      conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1438
      if (conn == NULL) {
dengyihao's avatar
dengyihao 已提交
1439
        conn = getConnFromPool2(pThrd, addr, pMsg);
dengyihao's avatar
dengyihao 已提交
1440
        if (conn != NULL) specifyConnRef(conn, true, refId);
dengyihao's avatar
dengyihao 已提交
1441
      }
dengyihao's avatar
dengyihao 已提交
1442
      transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1443 1444 1445
    }
    return conn;
  };
dengyihao's avatar
dengyihao 已提交
1446

dengyihao's avatar
dengyihao 已提交
1447
  conn = getConnFromPool2(pThrd, addr, pMsg);
dengyihao's avatar
dengyihao 已提交
1448
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1449
    tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1450
  } else {
dengyihao's avatar
dengyihao 已提交
1451
    tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1452
  }
dengyihao's avatar
dengyihao 已提交
1453 1454
  return conn;
}
dengyihao's avatar
dengyihao 已提交
1455
FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
dengyihao's avatar
dengyihao 已提交
1456 1457 1458
  if (pCvtAddr->cvt == false) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1459 1460 1461
  if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
    memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN);
    memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
dengyihao's avatar
dengyihao 已提交
1462 1463
  }
}
dengyihao's avatar
dengyihao 已提交
1464

dengyihao's avatar
dengyihao 已提交
1465
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
dengyihao's avatar
dengyihao 已提交
1466
  if (code != 0) return false;
dengyihao's avatar
dengyihao 已提交
1467
  // if (pCtx->retryCnt == 0) return false;
dengyihao's avatar
dengyihao 已提交
1468 1469 1470
  if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
  return true;
}
dengyihao's avatar
dengyihao 已提交
1471
FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
dengyihao's avatar
dengyihao 已提交
1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482
  if (pMsg == NULL) return -1;

  memset(pResp, 0, sizeof(STransMsg));

  pResp->code = TSDB_CODE_RPC_BROKEN_LINK;
  pResp->msgType = pMsg->msg.msgType + 1;
  pResp->info.ahandle = pMsg->ctx ? pMsg->ctx->ahandle : NULL;
  pResp->info.traceId = pMsg->msg.info.traceId;

  return 0;
}
dengyihao's avatar
dengyihao 已提交
1483 1484 1485 1486 1487
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
  uint32_t  addr = 0;
  uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
  if (v == NULL) {
    addr = taosGetIpv4FromFqdn(fqdn);
1488 1489 1490 1491
    if (addr == 0xffffffff) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr());
      return addr;
dengyihao's avatar
dengyihao 已提交
1492 1493
    }

dengyihao's avatar
dengyihao 已提交
1494 1495 1496 1497 1498 1499 1500 1501 1502 1503
    taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
  } else {
    addr = *v;
  }
  return addr;
}
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
  // impl later
  return;
}
dengyihao's avatar
dengyihao 已提交
1504

dengyihao's avatar
dengyihao 已提交
1505 1506 1507 1508 1509
static void doFreeTimeoutMsg(void* param) {
  STaskArg* arg = param;
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1510

dengyihao's avatar
dengyihao 已提交
1511 1512 1513 1514 1515 1516
  QUEUE_REMOVE(&pMsg->q);
  STraceId* trace = &pMsg->msg.info.traceId;
  tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
  doNotifyApp(pMsg, pThrd);
  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
1517
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1518
  STrans* pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1519

dengyihao's avatar
dengyihao 已提交
1520 1521
  cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
  if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
dengyihao's avatar
dengyihao 已提交
1522
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1523 1524
    return;
  }
dengyihao's avatar
dengyihao 已提交
1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536
  if (tmsgIsValid(pMsg->msg.msgType)) {
    char buf[128] = {0};
    sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType));
    int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
    if (NULL == 0) {
      int localCount = 1;
      taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
    } else {
      int localCount = *count + 1;
      taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
    }
  }
dengyihao's avatar
dengyihao 已提交
1537

dengyihao's avatar
dengyihao 已提交
1538 1539
  char*    fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
  uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
dengyihao's avatar
dengyihao 已提交
1540 1541
  char     addr[TSDB_FQDN_LEN + 64] = {0};
  CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port);
dengyihao's avatar
dengyihao 已提交
1542

dengyihao's avatar
dengyihao 已提交
1543
  bool      ignore = false;
dengyihao's avatar
dengyihao 已提交
1544
  SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr);
dengyihao's avatar
dengyihao 已提交
1545
  if (ignore == true) {
dengyihao's avatar
dengyihao 已提交
1546
    // persist conn already release by server
dengyihao's avatar
dengyihao 已提交
1547 1548
    STransMsg resp;
    cliBuildExceptResp(pMsg, &resp);
dengyihao's avatar
dengyihao 已提交
1549 1550 1551
    if (pMsg->type != Release) {
      pTransInst->cfp(pTransInst->parent, &resp, NULL);
    }
dengyihao's avatar
dengyihao 已提交
1552
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1553 1554
    return;
  }
dengyihao's avatar
dengyihao 已提交
1555
  if (conn == NULL && pMsg == NULL) {
dengyihao's avatar
dengyihao 已提交
1556 1557
    return;
  }
dengyihao's avatar
dengyihao 已提交
1558
  STraceId* trace = &pMsg->msg.info.traceId;
dengyihao's avatar
dengyihao 已提交
1559

dengyihao's avatar
dengyihao 已提交
1560
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1561
    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
dengyihao's avatar
dengyihao 已提交
1562
    transQueuePush(&conn->cliMsgs, pMsg);
U
ubuntu 已提交
1563
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1564
  } else {
U
ubuntu 已提交
1565
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
1566 1567 1568 1569

    int64_t refId = (int64_t)pMsg->msg.info.handle;
    if (refId != 0) specifyConnRef(conn, true, refId);

dengyihao's avatar
dengyihao 已提交
1570
    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
dengyihao's avatar
dengyihao 已提交
1571
    transQueuePush(&conn->cliMsgs, pMsg);
dengyihao's avatar
dengyihao 已提交
1572

dengyihao's avatar
dengyihao 已提交
1573
    conn->ip = taosStrdup(addr);
dengyihao's avatar
dengyihao 已提交
1574

dengyihao's avatar
dengyihao 已提交
1575
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
1576 1577 1578 1579 1580 1581 1582 1583 1584
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1585

dengyihao's avatar
dengyihao 已提交
1586
    struct sockaddr_in addr;
1587
    addr.sin_family = AF_INET;
1588
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1589
    addr.sin_port = (uint16_t)htons(port);
dengyihao's avatar
dengyihao 已提交
1590

dengyihao's avatar
dengyihao 已提交
1591
    tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip);
dengyihao's avatar
dengyihao 已提交
1592
    pThrd->newConnCount++;
1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
    if (fd == -1) {
      tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
              tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleExcept(conn);
      errno = 0;
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
      tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
    ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret != 0) {
      tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1613

1614
    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
dengyihao's avatar
dengyihao 已提交
1615
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1616 1617 1618 1619 1620
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1621
      cliHandleFastFail(conn, ret);
dengyihao's avatar
dengyihao 已提交
1622 1623
      return;
    }
dengyihao's avatar
dengyihao 已提交
1624
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
dengyihao's avatar
dengyihao 已提交
1625
  }
dengyihao's avatar
dengyihao 已提交
1626
  tGTrace("%s conn %p ready", pTransInst->label, conn);
dengyihao's avatar
dengyihao 已提交
1627
}
dengyihao's avatar
dengyihao 已提交
1628

dengyihao's avatar
dengyihao 已提交
1629
static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1630
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1631

dengyihao's avatar
dengyihao 已提交
1632 1633
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1634 1635 1636
    QUEUE_REMOVE(h);

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1637 1638 1639 1640 1641

    if (pMsg->type == Quit) {
      pThrd->stopMsg = pMsg;
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1642 1643 1644 1645 1646 1647 1648 1649
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);

    count++;
  }
  if (count >= 2) {
    tTrace("cli process batch size:%d", count);
  }
}
dengyihao's avatar
dengyihao 已提交
1650
SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
dengyihao's avatar
dengyihao 已提交
1651
  if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) {
dengyihao's avatar
dengyihao 已提交
1652 1653 1654 1655
    return NULL;
  }
  queue* hr = QUEUE_HEAD(&pList->wq);
  QUEUE_REMOVE(hr);
dengyihao's avatar
dengyihao 已提交
1656
  pList->sending += 1;
dengyihao's avatar
dengyihao 已提交
1657 1658 1659 1660 1661 1662

  pList->len -= 1;

  SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
  return batch;
}
dengyihao's avatar
dengyihao 已提交
1663

dengyihao's avatar
dengyihao 已提交
1664
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1665 1666
  STrans* pInst = pThrd->pTransInst;

dengyihao's avatar
dengyihao 已提交
1667
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1668 1669
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1670
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
1671 1672

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1673 1674 1675 1676 1677 1678

    if (pMsg->type == Quit) {
      pThrd->stopMsg = pMsg;
      continue;
    }

dengyihao's avatar
dengyihao 已提交
1679
    if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
1680 1681 1682 1683 1684 1685 1686
      STransConnCtx* pCtx = pMsg->ctx;

      char*    ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
      uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
      char     key[TSDB_FQDN_LEN + 64] = {0};
      CONN_CONSTRUCT_HASH_KEY(key, ip, port);

dengyihao's avatar
dengyihao 已提交
1687 1688 1689 1690 1691
      // SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key));
      SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key));
      if (ppBatchList == NULL || *ppBatchList == NULL) {
        SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
        QUEUE_INIT(&pBatchList->wq);
dengyihao's avatar
dengyihao 已提交
1692
        pBatchList->connMax = pInst->connLimitNum;
dengyihao's avatar
dengyihao 已提交
1693
        pBatchList->connCnt = 0;
dengyihao's avatar
dengyihao 已提交
1694
        pBatchList->batchLenLimit = pInst->batchSize;
dengyihao's avatar
dengyihao 已提交
1695
        pBatchList->len += 1;
dengyihao's avatar
dengyihao 已提交
1696

1697 1698
        pBatchList->ip = taosStrdup(ip);
        pBatchList->dst = taosStrdup(key);
dengyihao's avatar
dengyihao 已提交
1699 1700
        pBatchList->port = port;

dengyihao's avatar
dengyihao 已提交
1701 1702
        SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
        QUEUE_INIT(&pBatch->wq);
dengyihao's avatar
dengyihao 已提交
1703 1704
        QUEUE_INIT(&pBatch->listq);

dengyihao's avatar
dengyihao 已提交
1705 1706 1707
        QUEUE_PUSH(&pBatch->wq, h);
        pBatch->wLen += 1;
        pBatch->batchSize += pMsg->msg.contLen;
dengyihao's avatar
dengyihao 已提交
1708
        pBatch->pList = pBatchList;
dengyihao's avatar
dengyihao 已提交
1709

dengyihao's avatar
dengyihao 已提交
1710
        QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
dengyihao's avatar
dengyihao 已提交
1711

dengyihao's avatar
dengyihao 已提交
1712
        taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
1713
      } else {
dengyihao's avatar
dengyihao 已提交
1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
        if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
          SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
          QUEUE_INIT(&pBatch->wq);
          QUEUE_INIT(&pBatch->listq);

          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->wLen += 1;
          pBatch->batchSize = pMsg->msg.contLen;
          pBatch->pList = *ppBatchList;

          QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
          (*ppBatchList)->len += 1;

          continue;
        }

        queue*     hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
        SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
        if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) {
          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->batchSize += pMsg->msg.contLen;
dengyihao's avatar
dengyihao 已提交
1735
          pBatch->wLen += 1;
dengyihao's avatar
dengyihao 已提交
1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748
        } else {
          SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
          QUEUE_INIT(&pBatch->wq);
          QUEUE_INIT(&pBatch->listq);

          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->wLen += 1;
          pBatch->batchSize += pMsg->msg.contLen;
          pBatch->pList = *ppBatchList;

          QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
          (*ppBatchList)->len += 1;
        }
dengyihao's avatar
dengyihao 已提交
1749
      }
dengyihao's avatar
dengyihao 已提交
1750
      continue;
dengyihao's avatar
dengyihao 已提交
1751
    }
dengyihao's avatar
dengyihao 已提交
1752
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1753
    count++;
dengyihao's avatar
dengyihao 已提交
1754
  }
dengyihao's avatar
dengyihao 已提交
1755

dengyihao's avatar
dengyihao 已提交
1756
  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
dengyihao's avatar
dengyihao 已提交
1757
  while (pIter != NULL) {
dengyihao's avatar
dengyihao 已提交
1758
    SCliBatchList* batchList = (SCliBatchList*)(*pIter);
dengyihao's avatar
dengyihao 已提交
1759 1760 1761
    SCliBatch*     batch = cliGetHeadFromList(batchList);
    if (batch != NULL) {
      cliHandleBatchReq(batch, pThrd);
dengyihao's avatar
dengyihao 已提交
1762
    }
dengyihao's avatar
dengyihao 已提交
1763
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
dengyihao's avatar
dengyihao 已提交
1764 1765
  }

dengyihao's avatar
dengyihao 已提交
1766
  if (count >= 2) {
S
Shengliang Guan 已提交
1767
    tTrace("cli process batch size:%d", count);
dengyihao's avatar
dengyihao 已提交
1768
  }
dengyihao's avatar
dengyihao 已提交
1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781
}

static void cliAsyncCb(uv_async_t* handle) {
  SAsyncItem* item = handle->data;
  SCliThrd*   pThrd = item->pThrd;
  STrans*     pTransInst = pThrd->pTransInst;

  // batch process to avoid to lock/unlock frequently
  queue wq;
  taosThreadMutexLock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  taosThreadMutexUnlock(&item->mtx);

dengyihao's avatar
dengyihao 已提交
1782 1783 1784 1785 1786 1787 1788 1789
  void* pIter = taosHashIterate(pThrd->msgCount, NULL);
  while (pIter != NULL) {
    int*   count = pIter;
    size_t len = 0;
    char*  key = taosHashGetKey(pIter, &len);
    if (*count != 0) {
      tDebug("key: %s count: %d", key, *count);
    }
dengyihao's avatar
dengyihao 已提交
1790

dengyihao's avatar
dengyihao 已提交
1791 1792 1793
    pIter = taosHashIterate(pThrd->msgCount, pIter);
  }
  tDebug("all conn count: %d", pThrd->newConnCount);
dengyihao's avatar
dengyihao 已提交
1794

dengyihao's avatar
dengyihao 已提交
1795
  int8_t supportBatch = pTransInst->supportBatch;
dengyihao's avatar
dengyihao 已提交
1796
  if (supportBatch == 0) {
dengyihao's avatar
dengyihao 已提交
1797
    cliNoBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1798
  } else if (supportBatch == 1) {
dengyihao's avatar
dengyihao 已提交
1799
    cliBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1800
  }
dengyihao's avatar
dengyihao 已提交
1801

dengyihao's avatar
dengyihao 已提交
1802
  if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1803
}
dengyihao's avatar
dengyihao 已提交
1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829
static void cliPrepareCb(uv_prepare_t* handle) {
  SCliThrd* thrd = handle->data;
  tTrace("prepare work start");

  SAsyncPool* pool = thrd->asyncPool;
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;

    queue wq;
    taosThreadMutexLock(&item->mtx);
    QUEUE_MOVE(&item->qmsg, &wq);
    taosThreadMutexUnlock(&item->mtx);

    int count = 0;
    while (!QUEUE_IS_EMPTY(&wq)) {
      queue* h = QUEUE_HEAD(&wq);
      QUEUE_REMOVE(h);

      SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
      (*cliAsyncHandle[pMsg->type])(pMsg, thrd);
      count++;
    }
  }
  tTrace("prepare work end");
  if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
dengyihao's avatar
dengyihao 已提交
1830
}
dengyihao's avatar
dengyihao 已提交
1831

dengyihao's avatar
dengyihao 已提交
1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860
void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
  transCtxCleanup(&conn->ctx);
  cliReleaseUnfinishedMsg(conn);
  if (destroy == 1) {
    transQueueDestroy(&conn->cliMsgs);
  } else {
    transQueueClear(&conn->cliMsgs);
  }
}

void cliIteraConnMsgs(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* cmsg = transQueueGet(&conn->cliMsgs, i);
    if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) {
      continue;
    }

    STransMsg resp = {0};
    if (-1 == cliBuildExceptResp(cmsg, &resp)) {
      continue;
    }
    pTransInst->cfp(pTransInst->parent, &resp, NULL);

    cmsg->ctx->ahandle = NULL;
  }
}
dengyihao's avatar
dengyihao 已提交
1861 1862 1863
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
  if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
    uint64_t ahandle = pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
1864
    tDebug("ahandle = %" PRIu64 "", ahandle);
dengyihao's avatar
dengyihao 已提交
1865 1866
    SCliMsg* pMsg = NULL;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
dengyihao's avatar
dengyihao 已提交
1867

dengyihao's avatar
dengyihao 已提交
1868 1869
    transClearBuffer(&conn->readBuf);
    transFreeMsg(transContFromHead((char*)pHead));
dengyihao's avatar
dengyihao 已提交
1870 1871 1872 1873

    for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->cliMsgs); i++) {
      SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i);
      if (cliMsg->type == Release) {
dengyihao's avatar
dengyihao 已提交
1874
        ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req");
dengyihao's avatar
dengyihao 已提交
1875 1876
        return true;
      }
dengyihao's avatar
dengyihao 已提交
1877
    }
dengyihao's avatar
dengyihao 已提交
1878 1879 1880

    cliIteraConnMsgs(conn);

dengyihao's avatar
dengyihao 已提交
1881 1882
    tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1883

dengyihao's avatar
dengyihao 已提交
1884 1885 1886 1887 1888 1889
    addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
    return true;
  }
  return false;
}

U
ubuntu 已提交
1890
static void* cliWorkThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
1891
  SCliThrd* pThrd = (SCliThrd*)arg;
dengyihao's avatar
dengyihao 已提交
1892
  pThrd->pid = taosGetSelfPthreadId();
G
gccgdb1234 已提交
1893
  setThreadName("trans-cli-work");
dengyihao's avatar
dengyihao 已提交
1894
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
dengyihao's avatar
dengyihao 已提交
1895 1896

  tDebug("thread quit-thread:%08" PRId64, pThrd->pid);
1897
  return NULL;
dengyihao's avatar
dengyihao 已提交
1898 1899
}

U
ubuntu 已提交
1900
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
1901
  SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj));
dengyihao's avatar
dengyihao 已提交
1902

dengyihao's avatar
dengyihao 已提交
1903
  STrans* pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
1904
  memcpy(cli->label, label, TSDB_LABEL_LEN);
dengyihao's avatar
dengyihao 已提交
1905
  cli->numOfThreads = numOfThreads;
dengyihao's avatar
dengyihao 已提交
1906
  cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*));
dengyihao's avatar
dengyihao 已提交
1907 1908

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
1909
    SCliThrd* pThrd = createThrdObj(shandle);
dengyihao's avatar
dengyihao 已提交
1910
    if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
1911
      goto _err;
dengyihao's avatar
dengyihao 已提交
1912 1913 1914
    }

    int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
1915 1916 1917
    if (err != 0) {
      goto _err;
    } else {
S
Shengliang Guan 已提交
1918
      tDebug("success to create tranport-cli thread:%d", i);
dengyihao's avatar
dengyihao 已提交
1919
    }
dengyihao's avatar
dengyihao 已提交
1920
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
1921 1922
  }
  return cli;
dengyihao's avatar
dengyihao 已提交
1923 1924 1925 1926 1927

_err:
  taosMemoryFree(cli->pThreadObj);
  taosMemoryFree(cli);
  return NULL;
dengyihao's avatar
dengyihao 已提交
1928
}
dengyihao's avatar
dengyihao 已提交
1929

dengyihao's avatar
dengyihao 已提交
1930
static FORCE_INLINE void destroyUserdata(STransMsg* userdata) {
dengyihao's avatar
dengyihao 已提交
1931 1932 1933 1934 1935 1936
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
dengyihao's avatar
dengyihao 已提交
1937

dengyihao's avatar
dengyihao 已提交
1938
static FORCE_INLINE void destroyCmsg(void* arg) {
dengyihao's avatar
dengyihao 已提交
1939
  SCliMsg* pMsg = arg;
dengyihao's avatar
dengyihao 已提交
1940 1941 1942
  if (pMsg == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1943

dengyihao's avatar
dengyihao 已提交
1944 1945
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
wafwerar's avatar
wafwerar 已提交
1946
  taosMemoryFree(pMsg);
dengyihao's avatar
dengyihao 已提交
1947
}
dengyihao's avatar
dengyihao 已提交
1948

dengyihao's avatar
dengyihao 已提交
1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
  if (param == NULL) return;

  STaskArg* arg = param;
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;

  tDebug("destroy Ahandle A");
  if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
    tDebug("destroy Ahandle B");
    pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
  }
  tDebug("destroy Ahandle C");

  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
  taosMemoryFree(pMsg);
}

dengyihao's avatar
dengyihao 已提交
1968 1969 1970
static SCliThrd* createThrdObj(void* trans) {
  STrans* pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
1971
  SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd));
U
ubuntu 已提交
1972

dengyihao's avatar
dengyihao 已提交
1973
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
1974
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
1975

wafwerar's avatar
wafwerar 已提交
1976
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
1977 1978 1979 1980 1981 1982 1983 1984
  int err = uv_loop_init(pThrd->loop);
  if (err != 0) {
    tError("failed to init uv_loop, reason:%s", uv_err_name(err));
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1985 1986 1987 1988 1989
  if (pTransInst->supportBatch) {
    pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 4, pThrd, cliAsyncCb);
  } else {
    pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb);
  }
dengyihao's avatar
dengyihao 已提交
1990
  if (pThrd->asyncPool == NULL) {
dengyihao's avatar
ref log  
dengyihao 已提交
1991
    tError("failed to init async pool");
dengyihao's avatar
dengyihao 已提交
1992 1993 1994 1995 1996 1997
    uv_loop_close(pThrd->loop);
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1998 1999 2000 2001

  pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
  uv_prepare_init(pThrd->loop, pThrd->prepare);
  pThrd->prepare->data = pThrd;
dengyihao's avatar
dengyihao 已提交
2002
  // uv_prepare_start(pThrd->prepare, cliPrepareCb);
dengyihao's avatar
dengyihao 已提交
2003

dengyihao's avatar
dengyihao 已提交
2004
  int32_t timerSize = 64;
dengyihao's avatar
dengyihao 已提交
2005 2006 2007 2008 2009 2010 2011
  pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
  for (int i = 0; i < timerSize; i++) {
    uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    uv_timer_init(pThrd->loop, timer);
    taosArrayPush(pThrd->timerList, &timer);
  }

dengyihao's avatar
dengyihao 已提交
2012
  pThrd->pool = createConnPool(4);
dengyihao's avatar
dengyihao 已提交
2013
  transDQCreate(pThrd->loop, &pThrd->delayQueue);
dengyihao's avatar
dengyihao 已提交
2014

dengyihao's avatar
dengyihao 已提交
2015 2016
  transDQCreate(pThrd->loop, &pThrd->timeoutQueue);

dengyihao's avatar
dengyihao 已提交
2017 2018
  transDQCreate(pThrd->loop, &pThrd->waitConnQueue);

dengyihao's avatar
dengyihao 已提交
2019 2020 2021
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
  pThrd->pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
2022
  pThrd->destroyAhandleFp = pTransInst->destroyFp;
dengyihao's avatar
dengyihao 已提交
2023
  pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
2024 2025
  pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);

dengyihao's avatar
dengyihao 已提交
2026
  pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
2027

dengyihao's avatar
dengyihao 已提交
2028
  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
2029 2030

  pThrd->newConnCount = 0;
dengyihao's avatar
dengyihao 已提交
2031
  pThrd->msgCount = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
2032 2033
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
2034
static void destroyThrdObj(SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
2035 2036 2037
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
2038

wafwerar's avatar
wafwerar 已提交
2039
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
2040
  CLI_RELEASE_UV(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
2041
  taosThreadMutexDestroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
2042
  TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
dengyihao's avatar
dengyihao 已提交
2043
  transAsyncPoolDestroy(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
2044

dengyihao's avatar
dengyihao 已提交
2045
  transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
dengyihao's avatar
dengyihao 已提交
2046
  transDQDestroy(pThrd->timeoutQueue, NULL);
dengyihao's avatar
dengyihao 已提交
2047
  transDQDestroy(pThrd->waitConnQueue, NULL);
dengyihao's avatar
dengyihao 已提交
2048

dengyihao's avatar
dengyihao 已提交
2049
  tDebug("thread destroy %" PRId64, pThrd->pid);
dengyihao's avatar
dengyihao 已提交
2050 2051 2052 2053 2054
  for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
    uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
    taosMemoryFree(timer);
  }
  taosArrayDestroy(pThrd->timerList);
dengyihao's avatar
dengyihao 已提交
2055
  taosMemoryFree(pThrd->prepare);
wafwerar's avatar
wafwerar 已提交
2056
  taosMemoryFree(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
2057
  taosHashCleanup(pThrd->fqdn2ipCache);
dengyihao's avatar
dengyihao 已提交
2058
  taosHashCleanup(pThrd->failFastCache);
dengyihao's avatar
dengyihao 已提交
2059 2060 2061

  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
  while (pIter != NULL) {
dengyihao's avatar
dengyihao 已提交
2062 2063 2064 2065 2066 2067 2068 2069
    SCliBatchList* pBatchList = (SCliBatchList*)(*pIter);
    while (!QUEUE_IS_EMPTY(&pBatchList->wq)) {
      queue* h = QUEUE_HEAD(&pBatchList->wq);
      QUEUE_REMOVE(h);

      SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
      cliDestroyBatch(pBatch);
    }
dengyihao's avatar
dengyihao 已提交
2070 2071
    taosMemoryFree(pBatchList->ip);
    taosMemoryFree(pBatchList->dst);
dengyihao's avatar
dengyihao 已提交
2072
    taosMemoryFree(pBatchList);
dengyihao's avatar
dengyihao 已提交
2073

dengyihao's avatar
dengyihao 已提交
2074 2075
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
  }
dengyihao's avatar
dengyihao 已提交
2076
  taosHashCleanup(pThrd->batchCache);
wafwerar's avatar
wafwerar 已提交
2077
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
2078
}
dengyihao's avatar
dengyihao 已提交
2079

dengyihao's avatar
dengyihao 已提交
2080
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
2081
  //
wafwerar's avatar
wafwerar 已提交
2082
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
2083
}
dengyihao's avatar
dengyihao 已提交
2084

dengyihao's avatar
dengyihao 已提交
2085
void cliSendQuit(SCliThrd* thrd) {
dengyihao's avatar
dengyihao 已提交
2086
  // cli can stop gracefully
wafwerar's avatar
wafwerar 已提交
2087
  SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2088
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
2089
  transAsyncSend(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
2090
  atomic_store_8(&thrd->asyncPool->stop, 1);
dengyihao's avatar
dengyihao 已提交
2091
}
dengyihao's avatar
dengyihao 已提交
2092 2093
void cliWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
dengyihao's avatar
dengyihao 已提交
2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105
    if (uv_handle_get_type(handle) == UV_TIMER) {
      // SCliConn* pConn = handle->data;
      //  if (pConn != NULL && pConn->timer != NULL) {
      //    SCliThrd* pThrd = pConn->hostThrd;
      //    uv_timer_stop((uv_timer_t*)handle);
      //    handle->data = NULL;
      //    taosArrayPush(pThrd->timerList, &pConn->timer);
      //    pConn->timer = NULL;
      //  }
    } else {
      uv_read_stop((uv_stream_t*)handle);
    }
dengyihao's avatar
dengyihao 已提交
2106
    uv_close(handle, cliDestroy);
dengyihao's avatar
dengyihao 已提交
2107 2108
  }
}
dengyihao's avatar
dengyihao 已提交
2109

dengyihao's avatar
dengyihao 已提交
2110
FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
dengyihao's avatar
dengyihao 已提交
2111
  int32_t index = pTransInst->index;
dengyihao's avatar
dengyihao 已提交
2112 2113 2114
  if (pTransInst->numOfThreads == 0) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2115 2116 2117 2118
  /*
   * no lock, and to avoid CPU load imbalance, set limit pTransInst->numOfThreads * 2000;
   */
  if (pTransInst->index++ >= pTransInst->numOfThreads * 2000) {
U
ubuntu 已提交
2119 2120 2121 2122
    pTransInst->index = 0;
  }
  return index % pTransInst->numOfThreads;
}
dengyihao's avatar
dengyihao 已提交
2123
static FORCE_INLINE void doDelayTask(void* param) {
dengyihao's avatar
dengyihao 已提交
2124
  STaskArg* arg = param;
dengyihao's avatar
dengyihao 已提交
2125
  cliHandleReq((SCliMsg*)arg->param1, (SCliThrd*)arg->param2);
dengyihao's avatar
dengyihao 已提交
2126 2127
  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
2128

dengyihao's avatar
dengyihao 已提交
2129 2130 2131
static void doCloseIdleConn(void* param) {
  STaskArg* arg = param;
  SCliConn* conn = arg->param1;
dengyihao's avatar
dengyihao 已提交
2132
  tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
2133
  conn->task = NULL;
dengyihao's avatar
dengyihao 已提交
2134 2135
  cliDestroyConn(conn, true);
  taosMemoryFree(arg);
dengyihao's avatar
dengyihao 已提交
2136 2137
}

dengyihao's avatar
dengyihao 已提交
2138
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
2139
  STrans*        pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2140 2141
  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
2142 2143 2144 2145 2146 2147 2148
  if (rpcDebugFlag & DEBUG_DEBUG) {
    STraceId* trace = &pMsg->msg.info.traceId;
    char      tbuf[256] = {0};
    EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
    tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
            pCtx->retryStep, pCtx->retryNextInterval);
  }
dengyihao's avatar
dengyihao 已提交
2149 2150 2151 2152

  STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
  arg->param1 = pMsg;
  arg->param2 = pThrd;
dengyihao's avatar
dengyihao 已提交
2153 2154

  transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval);
dengyihao's avatar
dengyihao 已提交
2155
}
dengyihao's avatar
dengyihao 已提交
2156

dengyihao's avatar
dengyihao 已提交
2157
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
dengyihao's avatar
dengyihao 已提交
2158 2159 2160 2161
  if (*val != exp) {
    *val = newVal;
  }
}
dengyihao's avatar
dengyihao 已提交
2162

dengyihao's avatar
dengyihao 已提交
2163
FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
dengyihao's avatar
dengyihao 已提交
2164 2165 2166 2167 2168 2169
  if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
    return false;
  }
  // rebuild resp msg
  SEpSet epset;
  if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) {
dengyihao's avatar
dengyihao 已提交
2170 2171 2172 2173
    return false;
  }
  int32_t tlen = tSerializeSEpSet(NULL, 0, dst);

dengyihao's avatar
dengyihao 已提交
2174 2175 2176 2177 2178 2179 2180
  char*   buf = NULL;
  int32_t len = pResp->contLen - tlen;
  if (len != 0) {
    buf = rpcMallocCont(len);
    memcpy(buf, (char*)pResp->pCont + tlen, len);
  }
  rpcFreeCont(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2181 2182

  pResp->pCont = buf;
dengyihao's avatar
dengyihao 已提交
2183 2184
  pResp->contLen = len;

dengyihao's avatar
dengyihao 已提交
2185
  epsetAssign(dst, &epset);
dengyihao's avatar
dengyihao 已提交
2186 2187
  return true;
}
dengyihao's avatar
dengyihao 已提交
2188
bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
dengyihao's avatar
dengyihao 已提交
2189
  bool noDelay = true;
dengyihao's avatar
dengyihao 已提交
2190 2191 2192 2193 2194 2195 2196
  if (hasEpSet == false) {
    if (pResp->contLen == 0) {
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2197 2198 2199 2200 2201 2202 2203 2204 2205 2206
    } else if (pResp->contLen != 0) {
      SEpSet  epSet;
      int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
      if (valid < 0) {
        tDebug("get invalid epset, epset equal, continue");
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
2207
      } else {
dengyihao's avatar
dengyihao 已提交
2208 2209
        if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
          tDebug("epset not equal, retry new epset");
dengyihao's avatar
dengyihao 已提交
2210
          epsetAssign(&pCtx->epSet, &epSet);
dengyihao's avatar
dengyihao 已提交
2211 2212 2213 2214 2215 2216 2217 2218 2219
          noDelay = false;
        } else {
          if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
            noDelay = false;
          } else {
            tDebug("epset equal, continue");
            EPSET_FORWARD_INUSE(&pCtx->epSet);
          }
        }
dengyihao's avatar
dengyihao 已提交
2220
      }
dengyihao's avatar
dengyihao 已提交
2221 2222
    }
  } else {
dengyihao's avatar
dengyihao 已提交
2223
    SEpSet  epSet;
dengyihao's avatar
dengyihao 已提交
2224 2225
    int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
    if (valid < 0) {
dengyihao's avatar
dengyihao 已提交
2226 2227 2228 2229 2230 2231
      tDebug("get invalid epset, epset equal, continue");
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2232
    } else {
dengyihao's avatar
dengyihao 已提交
2233 2234
      if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
        tDebug("epset not equal, retry new epset");
dengyihao's avatar
dengyihao 已提交
2235
        epsetAssign(&pCtx->epSet, &epSet);
dengyihao's avatar
dengyihao 已提交
2236
        noDelay = false;
dengyihao's avatar
dengyihao 已提交
2237
      } else {
dengyihao's avatar
dengyihao 已提交
2238 2239 2240 2241 2242 2243
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          tDebug("epset equal, continue");
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
2244
      }
dengyihao's avatar
dengyihao 已提交
2245 2246 2247 2248 2249
    }
  }
  return noDelay;
}
bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
2250 2251
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2252

dengyihao's avatar
dengyihao 已提交
2253 2254
  STransConnCtx* pCtx = pMsg->ctx;
  int32_t        code = pResp->code;
dengyihao's avatar
dengyihao 已提交
2255

dengyihao's avatar
dengyihao 已提交
2256
  bool retry = pTransInst->retry != NULL ? pTransInst->retry(code, pResp->msgType - 1) : false;
dengyihao's avatar
dengyihao 已提交
2257 2258 2259
  if (retry == false) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
2260 2261 2262 2263 2264 2265 2266 2267

  if (!pCtx->retryInit) {
    pCtx->retryMinInterval = pTransInst->retryMinInterval;
    pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
    pCtx->retryStepFactor = pTransInst->retryStepFactor;
    pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet;
    pCtx->retryInitTimestamp = taosGetTimestampMs();
    pCtx->retryNextInterval = pCtx->retryMinInterval;
dengyihao's avatar
dengyihao 已提交
2268
    pCtx->retryStep = 0;
dengyihao's avatar
dengyihao 已提交
2269
    pCtx->retryInit = true;
dengyihao's avatar
dengyihao 已提交
2270
    pCtx->retryCode = TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
2271 2272 2273

    // already retry, not use handle specified by app;
    pMsg->msg.info.handle = 0;
dengyihao's avatar
dengyihao 已提交
2274
  }
dengyihao's avatar
dengyihao 已提交
2275

dengyihao's avatar
dengyihao 已提交
2276 2277
  if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
    return false;
dengyihao's avatar
dengyihao 已提交
2278
  }
dengyihao's avatar
dengyihao 已提交
2279

dengyihao's avatar
dengyihao 已提交
2280 2281 2282 2283 2284 2285
  // code, msgType

  // A:  epset,   leader, not self
  // B:  epset,   not know leader
  // C:  no epset, leader but not serivce

dengyihao's avatar
dengyihao 已提交
2286 2287
  bool noDelay = false;
  if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
dengyihao's avatar
dengyihao 已提交
2288
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2289
    noDelay = cliResetEpset(pCtx, pResp, false);
dengyihao's avatar
dengyihao 已提交
2290 2291
    transFreeMsg(pResp->pCont);
    transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
2292
  } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
2293 2294
             code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED ||
             code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
dengyihao's avatar
dengyihao 已提交
2295
             code == TSDB_CODE_APP_IS_STOPPING || code == TSDB_CODE_VND_STOPPED) {
dengyihao's avatar
dengyihao 已提交
2296
    tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2297
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
2298 2299
    transFreeMsg(pResp->pCont);
    addConnToPool(pThrd->pool, pConn);
dengyihao's avatar
dengyihao 已提交
2300
  } else if (code == TSDB_CODE_SYN_RESTORING) {
dengyihao's avatar
dengyihao 已提交
2301
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2302
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
2303 2304
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2305
  } else {
dengyihao's avatar
dengyihao 已提交
2306
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2307 2308 2309
    noDelay = cliResetEpset(pCtx, pResp, false);
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2310
  }
dengyihao's avatar
dengyihao 已提交
2311 2312 2313 2314
  if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) {
    // save one internal code
    pCtx->retryCode = code;
  }
dengyihao's avatar
dengyihao 已提交
2315

dengyihao's avatar
dengyihao 已提交
2316 2317 2318
  if (noDelay == false) {
    pCtx->epsetRetryCnt = 1;
    pCtx->retryStep++;
dengyihao's avatar
dengyihao 已提交
2319

dengyihao's avatar
dengyihao 已提交
2320 2321 2322 2323 2324 2325 2326 2327
    int64_t factor = pow(pCtx->retryStepFactor, pCtx->retryStep - 1);
    pCtx->retryNextInterval = factor * pCtx->retryMinInterval;
    if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
      pCtx->retryNextInterval = pCtx->retryMaxInterval;
    }
  } else {
    pCtx->retryNextInterval = 0;
    pCtx->epsetRetryCnt++;
dengyihao's avatar
dengyihao 已提交
2328
  }
dengyihao's avatar
dengyihao 已提交
2329

dengyihao's avatar
dengyihao 已提交
2330
  pMsg->sent = 0;
dengyihao's avatar
dengyihao 已提交
2331
  cliSchedMsgToNextNode(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
2332
  return true;
dengyihao's avatar
dengyihao 已提交
2333
}
dengyihao's avatar
dengyihao 已提交
2334
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
2335 2336
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2337

dengyihao's avatar
dengyihao 已提交
2338
  if (pMsg == NULL || pMsg->ctx == NULL) {
dengyihao's avatar
dengyihao 已提交
2339
    tTrace("%s conn %p handle resp", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
2340 2341 2342
    pTransInst->cfp(pTransInst->parent, pResp, NULL);
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
2343

dengyihao's avatar
dengyihao 已提交
2344
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
2345

dengyihao's avatar
dengyihao 已提交
2346 2347 2348
  bool retry = cliGenRetryRule(pConn, pResp, pMsg);
  if (retry == true) {
    return -1;
dengyihao's avatar
dengyihao 已提交
2349
  }
dengyihao's avatar
dengyihao 已提交
2350

dengyihao's avatar
dengyihao 已提交
2351 2352 2353
  if (pCtx->retryCode != TSDB_CODE_SUCCESS) {
    int32_t code = pResp->code;
    // return internal code app
dengyihao's avatar
dengyihao 已提交
2354 2355
    if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK ||
        code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
dengyihao's avatar
dengyihao 已提交
2356 2357 2358 2359
      pResp->code = pCtx->retryCode;
    }
  }

2360
  // check whole vnodes is offline on this vgroup
2361 2362
  if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps || pCtx->retryStep > 0) {
    if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
A
Alex Duan 已提交
2363
      pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
2364
    } else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) {
A
Alex Duan 已提交
2365
      pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK;
2366 2367 2368
    }
  }

dengyihao's avatar
dengyihao 已提交
2369 2370
  STraceId* trace = &pResp->info.traceId;
  bool      hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2371
  if (hasEpSet) {
dengyihao's avatar
dengyihao 已提交
2372 2373 2374 2375 2376
    if (rpcDebugFlag & DEBUG_TRACE) {
      char tbuf[256] = {0};
      EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
      tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
    }
dengyihao's avatar
dengyihao 已提交
2377
  }
dengyihao's avatar
dengyihao 已提交
2378

dengyihao's avatar
dengyihao 已提交
2379
  if (tmsgIsValid(pResp->msgType - 1)) {
dengyihao's avatar
dengyihao 已提交
2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390
    char buf[128] = {0};
    sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1));
    int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
    if (NULL == 0) {
      int localCount = 0;
      taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
    } else {
      int localCount = *count - 1;
      taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
    }
  }
dengyihao's avatar
dengyihao 已提交
2391
  if (pCtx->pSem != NULL) {
dengyihao's avatar
dengyihao 已提交
2392
    tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
2393
    if (pCtx->pRsp == NULL) {
dengyihao's avatar
dengyihao 已提交
2394
      tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
2395 2396 2397
    } else {
      memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
    }
dengyihao's avatar
dengyihao 已提交
2398
    tsem_post(pCtx->pSem);
2399
    pCtx->pRsp = NULL;
dengyihao's avatar
dengyihao 已提交
2400
  } else {
dengyihao's avatar
dengyihao 已提交
2401
    tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
2402
    if (retry == false && hasEpSet == true) {
dengyihao's avatar
dengyihao 已提交
2403
      pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2404
    } else {
dengyihao's avatar
dengyihao 已提交
2405
      if (!cliIsEpsetUpdated(pResp->code, pCtx)) {
dengyihao's avatar
dengyihao 已提交
2406 2407 2408 2409
        pTransInst->cfp(pTransInst->parent, pResp, NULL);
      } else {
        pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2410
    }
dengyihao's avatar
dengyihao 已提交
2411
  }
dengyihao's avatar
dengyihao 已提交
2412
  return 0;
dengyihao's avatar
dengyihao 已提交
2413
}
U
ubuntu 已提交
2414 2415

void transCloseClient(void* arg) {
U
ubuntu 已提交
2416
  SCliObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
2417
  for (int i = 0; i < cli->numOfThreads; i++) {
U
ubuntu 已提交
2418
    cliSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2419
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2420
  }
wafwerar's avatar
wafwerar 已提交
2421 2422
  taosMemoryFree(cli->pThreadObj);
  taosMemoryFree(cli);
dengyihao's avatar
dengyihao 已提交
2423
}
dengyihao's avatar
dengyihao 已提交
2424 2425 2426 2427 2428
void transRefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2429
  tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2430 2431 2432 2433 2434 2435 2436
  UNUSED(ref);
}
void transUnrefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2437
  tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2438
  if (ref == 0) {
U
ubuntu 已提交
2439
    cliDestroyConn((SCliConn*)handle, true);
dengyihao's avatar
dengyihao 已提交
2440 2441
  }
}
dengyihao's avatar
dengyihao 已提交
2442
static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2443
  SCliThrd*  pThrd = NULL;
dengyihao's avatar
dengyihao 已提交
2444
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2445 2446 2447
  if (exh == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
2448

dengyihao's avatar
dengyihao 已提交
2449 2450 2451 2452 2453 2454
  if (exh->pThrd == NULL && trans != NULL) {
    int idx = cliRBChoseIdx(trans);
    if (idx < 0) return NULL;
    exh->pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }

dengyihao's avatar
dengyihao 已提交
2455
  pThrd = exh->pThrd;
dengyihao's avatar
dengyihao 已提交
2456
  transReleaseExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2457 2458
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
2459
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2460
  if (handle == 0) {
dengyihao's avatar
dengyihao 已提交
2461
    int idx = cliRBChoseIdx(trans);
dengyihao's avatar
dengyihao 已提交
2462
    if (idx < 0) return NULL;
dengyihao's avatar
dengyihao 已提交
2463 2464
    return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }
dengyihao's avatar
dengyihao 已提交
2465
  SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle);
D
dapan1121 已提交
2466
  return pThrd;
dengyihao's avatar
dengyihao 已提交
2467
}
dengyihao's avatar
dengyihao 已提交
2468
int transReleaseCliHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
2469 2470 2471
  int  idx = -1;
  bool valid = false;

dengyihao's avatar
dengyihao 已提交
2472
  SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
dengyihao's avatar
dengyihao 已提交
2473
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2474
    return -1;
dengyihao's avatar
dengyihao 已提交
2475
  }
dengyihao's avatar
dengyihao 已提交
2476

dengyihao's avatar
dengyihao 已提交
2477
  STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527};
dengyihao's avatar
rm code  
dengyihao 已提交
2478
  TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
dengyihao's avatar
dengyihao 已提交
2479

dengyihao's avatar
dengyihao 已提交
2480 2481 2482
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
  pCtx->ahandle = tmsg.info.ahandle;

dengyihao's avatar
dengyihao 已提交
2483
  SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2484
  cmsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
2485
  cmsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2486
  cmsg->type = Release;
dengyihao's avatar
dengyihao 已提交
2487
  cmsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2488

dengyihao's avatar
dengyihao 已提交
2489 2490 2491
  STraceId* trace = &tmsg.info.traceId;
  tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);

dengyihao's avatar
dengyihao 已提交
2492
  if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
dengyihao's avatar
dengyihao 已提交
2493
    destroyCmsg(cmsg);
dengyihao's avatar
dengyihao 已提交
2494 2495
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2496
  return 0;
dengyihao's avatar
dengyihao 已提交
2497
}
dengyihao's avatar
dengyihao 已提交
2498

dengyihao's avatar
dengyihao 已提交
2499
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
2500 2501 2502 2503 2504
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2505

dengyihao's avatar
dengyihao 已提交
2506
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
dengyihao's avatar
dengyihao 已提交
2507
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2508
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2509
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2510
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2511 2512
  }

dengyihao's avatar
dengyihao 已提交
2513
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
wafwerar's avatar
wafwerar 已提交
2514
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2515 2516 2517
  epsetAssign(&pCtx->epSet, pEpSet);
  epsetAssign(&pCtx->origEpSet, pEpSet);

S
Shengliang Guan 已提交
2518
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2519
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2520

H
Haojun Liao 已提交
2521
  if (ctx != NULL) pCtx->appCtx = *ctx;
dengyihao's avatar
dengyihao 已提交
2522

wafwerar's avatar
wafwerar 已提交
2523
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2524
  cliMsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2525
  cliMsg->msg = *pReq;
dengyihao's avatar
dengyihao 已提交
2526
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2527
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2528
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2529

dengyihao's avatar
dengyihao 已提交
2530
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2531 2532
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2533 2534
  if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2535
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2536 2537
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2538
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2539
  return 0;
dengyihao's avatar
dengyihao 已提交
2540
}
dengyihao's avatar
dengyihao 已提交
2541

dengyihao's avatar
dengyihao 已提交
2542
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
dengyihao's avatar
dengyihao 已提交
2543 2544 2545 2546 2547
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2548

dengyihao's avatar
dengyihao 已提交
2549 2550
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2551
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2552
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2553
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2554
  }
dengyihao's avatar
dengyihao 已提交
2555

dengyihao's avatar
dengyihao 已提交
2556 2557
  tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
  tsem_init(sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
2558

dengyihao's avatar
dengyihao 已提交
2559 2560
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());

wafwerar's avatar
wafwerar 已提交
2561
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2562 2563
  epsetAssign(&pCtx->epSet, pEpSet);
  epsetAssign(&pCtx->origEpSet, pEpSet);
S
Shengliang Guan 已提交
2564
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2565
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2566
  pCtx->pSem = sem;
dengyihao's avatar
dengyihao 已提交
2567 2568
  pCtx->pRsp = pRsp;

wafwerar's avatar
wafwerar 已提交
2569
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2570 2571 2572
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2573
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2574
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2575

dengyihao's avatar
dengyihao 已提交
2576
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2577 2578
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2579

dengyihao's avatar
dengyihao 已提交
2580 2581
  int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
  if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
2582
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2583
    goto _RETURN;
dengyihao's avatar
dengyihao 已提交
2584
  }
dengyihao's avatar
dengyihao 已提交
2585
  tsem_wait(sem);
dengyihao's avatar
dengyihao 已提交
2586 2587

_RETURN:
dengyihao's avatar
dengyihao 已提交
2588 2589
  tsem_destroy(sem);
  taosMemoryFree(sem);
dengyihao's avatar
dengyihao 已提交
2590
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2591
  return ret;
dengyihao's avatar
dengyihao 已提交
2592
}
dengyihao's avatar
dengyihao 已提交
2593 2594 2595
/*
 *
 **/
dengyihao's avatar
dengyihao 已提交
2596
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
dengyihao's avatar
dengyihao 已提交
2597
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2598 2599 2600
  if (pTransInst == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2601 2602 2603

  SCvtAddr cvtAddr = {0};
  if (ip != NULL && fqdn != NULL) {
dengyihao's avatar
dengyihao 已提交
2604 2605
    tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
    tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn));
dengyihao's avatar
dengyihao 已提交
2606 2607
    cvtAddr.cvt = true;
  }
dengyihao's avatar
dengyihao 已提交
2608 2609
  for (int i = 0; i < pTransInst->numOfThreads; i++) {
    STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2610
    pCtx->cvtAddr = cvtAddr;
dengyihao's avatar
dengyihao 已提交
2611 2612 2613 2614

    SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
    cliMsg->ctx = pCtx;
    cliMsg->type = Update;
dengyihao's avatar
dengyihao 已提交
2615
    cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2616

dengyihao's avatar
dengyihao 已提交
2617
    SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
S
Shengliang Guan 已提交
2618
    tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid);
dengyihao's avatar
dengyihao 已提交
2619

dengyihao's avatar
dengyihao 已提交
2620 2621
    if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) {
      destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2622
      transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2623 2624
      return -1;
    }
dengyihao's avatar
dengyihao 已提交
2625
  }
dengyihao's avatar
dengyihao 已提交
2626
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2627
  return 0;
dengyihao's avatar
dengyihao 已提交
2628
}
dengyihao's avatar
dengyihao 已提交
2629 2630 2631 2632 2633

int64_t transAllocHandle() {
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
  tDebug("pre alloc refId %" PRId64 "", exh->refId);
dengyihao's avatar
dengyihao 已提交
2634

dengyihao's avatar
dengyihao 已提交
2635 2636
  return exh->refId;
}