transCli.c 79.9 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
dengyihao's avatar
dengyihao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "transComm.h"

dengyihao's avatar
dengyihao 已提交
17 18 19 20 21
typedef struct {
  int32_t numOfConn;
  queue   msgQ;
} SMsgList;

dengyihao's avatar
dengyihao 已提交
22
typedef struct SConnList {
dengyihao's avatar
dengyihao 已提交
23 24 25 26
  queue     conns;
  int32_t   size;
  SMsgList* list;
  void*     pThrd;
dengyihao's avatar
dengyihao 已提交
27 28
} SConnList;

dengyihao's avatar
dengyihao 已提交
29
typedef struct {
dengyihao's avatar
dengyihao 已提交
30 31 32 33 34 35
  queue   wq;
  int32_t len;

  int connMax;
  int connCnt;
  int batchLenLimit;
dengyihao's avatar
dengyihao 已提交
36
  int sending;
dengyihao's avatar
dengyihao 已提交
37

dengyihao's avatar
dengyihao 已提交
38 39 40
  char*    dst;
  char*    ip;
  uint16_t port;
dengyihao's avatar
dengyihao 已提交
41 42 43 44 45 46 47 48 49 50

} SCliBatchList;

typedef struct {
  queue          wq;
  queue          listq;
  int32_t        wLen;
  int32_t        batchSize;  //
  int32_t        batch;
  SCliBatchList* pList;
dengyihao's avatar
dengyihao 已提交
51
} SCliBatch;
dengyihao's avatar
dengyihao 已提交
52

dengyihao's avatar
dengyihao 已提交
53
typedef struct SCliConn {
dengyihao's avatar
dengyihao 已提交
54
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
55 56
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
57
  queue        wreqQueue;
dengyihao's avatar
dengyihao 已提交
58 59

  uv_timer_t* timer;  // read timer, forbidden
dengyihao's avatar
dengyihao 已提交
60

dengyihao's avatar
dengyihao 已提交
61 62 63 64
  void* hostThrd;

  SConnBuffer readBuf;
  STransQueue cliMsgs;
dengyihao's avatar
dengyihao 已提交
65 66 67

  queue      q;
  SConnList* list;
dengyihao's avatar
dengyihao 已提交
68 69

  STransCtx  ctx;
dengyihao's avatar
dengyihao 已提交
70 71
  bool       broken;  // link broken or not
  ConnStatus status;  //
dengyihao's avatar
dengyihao 已提交
72

dengyihao's avatar
dengyihao 已提交
73 74
  SCliBatch* pBatch;

dengyihao's avatar
dengyihao 已提交
75 76
  int64_t refId;
  char*   ip;
dengyihao's avatar
dengyihao 已提交
77

dengyihao's avatar
dengyihao 已提交
78
  SDelayTask* task;
dengyihao's avatar
dengyihao 已提交
79

dengyihao's avatar
dengyihao 已提交
80
  // debug and log info
dengyihao's avatar
dengyihao 已提交
81 82 83
  char src[32];
  char dst[32];

dengyihao's avatar
dengyihao 已提交
84
} SCliConn;
dengyihao's avatar
dengyihao 已提交
85

dengyihao's avatar
dengyihao 已提交
86
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
87
  STransConnCtx* ctx;
dengyihao's avatar
formate  
dengyihao 已提交
88
  STransMsg      msg;
dengyihao's avatar
dengyihao 已提交
89
  queue          q;
dengyihao's avatar
dengyihao 已提交
90
  STransMsgType  type;
dengyihao's avatar
dengyihao 已提交
91

dengyihao's avatar
dengyihao 已提交
92
  int64_t  refId;
dengyihao's avatar
dengyihao 已提交
93 94
  uint64_t st;
  int      sent;  //(0: no send, 1: alread sent)
dengyihao's avatar
dengyihao 已提交
95 96
} SCliMsg;

dengyihao's avatar
dengyihao 已提交
97
typedef struct SCliThrd {
dengyihao's avatar
dengyihao 已提交
98 99 100 101 102 103
  TdThread      thread;  // tid
  int64_t       pid;     // pid
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  uv_prepare_t* prepare;
  void*         pool;  // conn pool
dengyihao's avatar
dengyihao 已提交
104
  // timer handles
dengyihao's avatar
dengyihao 已提交
105
  SArray* timerList;
dengyihao's avatar
dengyihao 已提交
106
  // msg queue
dengyihao's avatar
dengyihao 已提交
107
  queue         msg;
wafwerar's avatar
wafwerar 已提交
108
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
109
  SDelayQueue*  delayQueue;
dengyihao's avatar
dengyihao 已提交
110
  SDelayQueue*  timeoutQueue;
dengyihao's avatar
dengyihao 已提交
111
  SDelayQueue*  waitConnQueue;
dengyihao's avatar
dengyihao 已提交
112 113
  uint64_t      nextTimeout;  // next timeout
  void*         pTransInst;   //
dengyihao's avatar
dengyihao 已提交
114

dengyihao's avatar
dengyihao 已提交
115
  int connCount;
dengyihao's avatar
dengyihao 已提交
116
  void (*destroyAhandleFp)(void* ahandle);
dengyihao's avatar
dengyihao 已提交
117 118
  SHashObj* fqdn2ipCache;
  SCvtAddr  cvtAddr;
dengyihao's avatar
dengyihao 已提交
119

dengyihao's avatar
dengyihao 已提交
120
  SHashObj* failFastCache;
dengyihao's avatar
dengyihao 已提交
121
  SHashObj* batchCache;
dengyihao's avatar
dengyihao 已提交
122

dengyihao's avatar
dengyihao 已提交
123 124
  SCliMsg* stopMsg;

dengyihao's avatar
dengyihao 已提交
125
  bool quit;
dengyihao's avatar
dengyihao 已提交
126
} SCliThrd;
dengyihao's avatar
dengyihao 已提交
127

U
ubuntu 已提交
128
typedef struct SCliObj {
dengyihao's avatar
dengyihao 已提交
129 130 131 132
  char       label[TSDB_LABEL_LEN];
  int32_t    index;
  int        numOfThreads;
  SCliThrd** pThreadObj;
U
ubuntu 已提交
133
} SCliObj;
dengyihao's avatar
dengyihao 已提交
134

dengyihao's avatar
dengyihao 已提交
135 136 137 138 139 140 141
typedef struct {
  int32_t reinit;
  int64_t timestamp;
  int32_t count;
  int32_t threshold;
  int64_t interval;
} SFailFastItem;
dengyihao's avatar
dengyihao 已提交
142
// conn pool
dengyihao's avatar
dengyihao 已提交
143
// add expire timeout and capacity limit
dengyihao's avatar
dengyihao 已提交
144
static void*     createConnPool(int size);
145
static void*     destroyConnPool(void* pool);
dengyihao's avatar
dengyihao 已提交
146
static SCliConn* getConnFromPool(SCliThrd* thread, char* key);
dengyihao's avatar
dengyihao 已提交
147
static void      addConnToPool(void* pool, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
148
static void      doCloseIdleConn(void* param);
dengyihao's avatar
dengyihao 已提交
149

dengyihao's avatar
dengyihao 已提交
150 151
// register conn timer
static void cliConnTimeout(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
152 153
// register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
154
// register timer in each thread to clear expire conn
dengyihao's avatar
dengyihao 已提交
155
// static void cliTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
156
// alloc buffer for recv
dengyihao's avatar
dengyihao 已提交
157
static FORCE_INLINE void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
158
// callback after recv nbytes from socket
U
ubuntu 已提交
159
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
160
// callback after send data to socket
U
ubuntu 已提交
161
static void cliSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
162
// callback after conn to server
U
ubuntu 已提交
163 164
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
165
static void cliIdleCb(uv_idle_t* handle);
dengyihao's avatar
dengyihao 已提交
166
static void cliPrepareCb(uv_prepare_t* handle);
dengyihao's avatar
dengyihao 已提交
167

dengyihao's avatar
dengyihao 已提交
168
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
169
static void cliSendBatchCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
170 171

SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
dengyihao's avatar
dengyihao 已提交
172

dengyihao's avatar
dengyihao 已提交
173 174
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);

dengyihao's avatar
dengyihao 已提交
175
static int32_t allocConnRef(SCliConn* conn, bool update);
dengyihao's avatar
dengyihao 已提交
176

dengyihao's avatar
dengyihao 已提交
177
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
dengyihao's avatar
dengyihao 已提交
178

dengyihao's avatar
dengyihao 已提交
179
static SCliConn* cliCreateConn(SCliThrd* thrd);
U
ubuntu 已提交
180 181
static void      cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void      cliDestroy(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
182
static void      cliSend(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
183
static void      cliSendBatch(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
184
static void      cliDestroyConnMsgs(SCliConn* conn, bool destroy);
dengyihao's avatar
dengyihao 已提交
185

dengyihao's avatar
dengyihao 已提交
186 187
static void    doFreeTimeoutMsg(void* param);
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg);
dengyihao's avatar
dengyihao 已提交
188

dengyihao's avatar
dengyihao 已提交
189
// cli util func
dengyihao's avatar
dengyihao 已提交
190 191
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
dengyihao's avatar
dengyihao 已提交
192

dengyihao's avatar
dengyihao 已提交
193
static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
dengyihao's avatar
dengyihao 已提交
194

dengyihao's avatar
dengyihao 已提交
195 196 197
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn);
static FORCE_INLINE void     cliUpdateFqdnCache(SHashObj* cache, char* fqdn);

dengyihao's avatar
dengyihao 已提交
198
// process data read from server, add decompress etc later
U
ubuntu 已提交
199
static void cliHandleResp(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
200
// handle except about conn
U
ubuntu 已提交
201
static void cliHandleExcept(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
202
static void cliReleaseUnfinishedMsg(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
203
static void cliHandleFastFail(SCliConn* pConn, int status);
dengyihao's avatar
dengyihao 已提交
204

dengyihao's avatar
dengyihao 已提交
205
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd);
206
// handle req from app
dengyihao's avatar
dengyihao 已提交
207 208 209 210 211 212
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
                                                                   cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
213 214
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
/// NULL,cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
215

dengyihao's avatar
dengyihao 已提交
216 217
static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
static FORCE_INLINE void destroyCmsg(void* cmsg);
dengyihao's avatar
dengyihao 已提交
218
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
dengyihao's avatar
dengyihao 已提交
219
static FORCE_INLINE int  cliRBChoseIdx(STrans* pTransInst);
dengyihao's avatar
dengyihao 已提交
220
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
221

dengyihao's avatar
dengyihao 已提交
222
// thread obj
dengyihao's avatar
dengyihao 已提交
223
static SCliThrd* createThrdObj(void* trans);
dengyihao's avatar
dengyihao 已提交
224
static void      destroyThrdObj(SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
225

dengyihao's avatar
dengyihao 已提交
226 227 228 229 230 231 232 233 234
static void cliWalkCb(uv_handle_t* handle, void* arg);

#define CLI_RELEASE_UV(loop)        \
  do {                              \
    uv_walk(loop, cliWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);   \
    uv_loop_close(loop);            \
  } while (0);

dengyihao's avatar
dengyihao 已提交
235 236 237 238 239 240
// snprintf may cause performance problem
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port)          \
  do {                                                  \
    snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
  } while (0)

dengyihao's avatar
dengyihao 已提交
241 242
#define CONN_PERSIST_TIME(para)   ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
dengyihao's avatar
dengyihao 已提交
243

dengyihao's avatar
dengyihao 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle)                         \
  do {                                                                    \
    int i = 0, sz = transQueueSize(&conn->cliMsgs);                       \
    for (; i < sz; i++) {                                                 \
      pMsg = transQueueGet(&conn->cliMsgs, i);                            \
      if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
        break;                                                            \
      }                                                                   \
    }                                                                     \
    if (i == sz) {                                                        \
      pMsg = NULL;                                                        \
      tDebug("msg not found, %" PRIu64 "", ahandle);                      \
    } else {                                                              \
      pMsg = transQueueRm(&conn->cliMsgs, i);                             \
      tDebug("msg found, %" PRIu64 "", ahandle);                          \
    }                                                                     \
dengyihao's avatar
dengyihao 已提交
260
  } while (0)
dengyihao's avatar
dengyihao 已提交
261

U
ubuntu 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274
#define CONN_GET_NEXT_SENDMSG(conn)                 \
  do {                                              \
    int i = 0;                                      \
    do {                                            \
      pCliMsg = transQueueGet(&conn->cliMsgs, i++); \
      if (pCliMsg && 0 == pCliMsg->sent) {          \
        break;                                      \
      }                                             \
    } while (pCliMsg != NULL);                      \
    if (pCliMsg == NULL) {                          \
      goto _RETURN;                                 \
    }                                               \
  } while (0)
dengyihao's avatar
dengyihao 已提交
275

dengyihao's avatar
formate  
dengyihao 已提交
276 277
#define CONN_SET_PERSIST_BY_APP(conn) \
  do {                                \
dengyihao's avatar
dengyihao 已提交
278 279
    if (conn->status == ConnNormal) { \
      conn->status = ConnAcquire;     \
dengyihao's avatar
formate  
dengyihao 已提交
280 281 282
      transRefCliHandle(conn);        \
    }                                 \
  } while (0)
283

dengyihao's avatar
dengyihao 已提交
284 285 286 287
#define CONN_NO_PERSIST_BY_APP(conn) \
  (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) \
  (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
288

S
Shengliang Guan 已提交
289 290
#define REQUEST_NO_RESP(msg)         ((msg)->info.noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg)   ((msg)->info.persistHandle == 1)
dengyihao's avatar
dengyihao 已提交
291
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
dengyihao's avatar
dengyihao 已提交
292

dengyihao's avatar
dengyihao 已提交
293
#define EPSET_IS_VALID(epSet)       ((epSet) != NULL && (epSet)->numOfEps >= 0 && (epSet)->inUse >= 0)
dengyihao's avatar
dengyihao 已提交
294
#define EPSET_GET_SIZE(epSet)       (epSet)->numOfEps
dengyihao's avatar
dengyihao 已提交
295
#define EPSET_GET_INUSE_IP(epSet)   ((epSet)->eps[(epSet)->inUse].fqdn)
dengyihao's avatar
dengyihao 已提交
296
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
dengyihao's avatar
dengyihao 已提交
297 298 299 300 301 302
#define EPSET_FORWARD_INUSE(epSet)                             \
  do {                                                         \
    if ((epSet)->numOfEps != 0) {                              \
      ++((epSet)->inUse);                                      \
      (epSet)->inUse = ((epSet)->inUse) % ((epSet)->numOfEps); \
    }                                                          \
dengyihao's avatar
dengyihao 已提交
303
  } while (0)
dengyihao's avatar
dengyihao 已提交
304

dengyihao's avatar
dengyihao 已提交
305 306 307 308 309 310 311 312 313 314 315
#define EPSET_DEBUG_STR(epSet, tbuf)                                                                                   \
  do {                                                                                                                 \
    int len = snprintf(tbuf, sizeof(tbuf), "epset:{");                                                                 \
    for (int i = 0; i < (epSet)->numOfEps; i++) {                                                                      \
      if (i == (epSet)->numOfEps - 1) {                                                                                \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port);   \
      } else {                                                                                                         \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
      }                                                                                                                \
    }                                                                                                                  \
    len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse);                                    \
dengyihao's avatar
dengyihao 已提交
316
  } while (0);
dengyihao's avatar
dengyihao 已提交
317

U
ubuntu 已提交
318
static void* cliWorkThread(void* arg);
dengyihao's avatar
dengyihao 已提交
319

dengyihao's avatar
dengyihao 已提交
320 321 322 323 324 325 326 327
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
    if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
      if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
        conn->ctx.freeFunc(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
328
      } else if (msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) {
dengyihao's avatar
dengyihao 已提交
329
        tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
330
        pThrd->destroyAhandleFp(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
331 332 333 334
      }
    }
    destroyCmsg(msg);
  }
dengyihao's avatar
dengyihao 已提交
335
  transQueueClear(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
336
  memset(&conn->ctx, 0, sizeof(conn->ctx));
dengyihao's avatar
dengyihao 已提交
337
}
dengyihao's avatar
dengyihao 已提交
338
bool cliMaySendCachedMsg(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
339
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
340
    SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
341
    CONN_GET_NEXT_SENDMSG(conn);
dengyihao's avatar
dengyihao 已提交
342 343
    cliSend(conn);
    return true;
dengyihao's avatar
dengyihao 已提交
344
  }
dengyihao's avatar
dengyihao 已提交
345
  return false;
U
ubuntu 已提交
346 347
_RETURN:
  return false;
dengyihao's avatar
dengyihao 已提交
348
}
dengyihao's avatar
formate  
dengyihao 已提交
349
void cliHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
350 351
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
352

dengyihao's avatar
dengyihao 已提交
353 354 355 356 357 358
  if (conn->timer) {
    if (uv_is_active((uv_handle_t*)conn->timer)) {
      tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
      uv_timer_stop(conn->timer);
    }
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
359
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
360
    conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
361 362
  }

dengyihao's avatar
opt rpc  
dengyihao 已提交
363
  STransMsgHead* pHead = NULL;
dengyihao's avatar
dengyihao 已提交
364 365 366

  int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
  if (msgLen <= 0) {
dengyihao's avatar
dengyihao 已提交
367 368 369
    tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
370 371 372 373

  if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
    tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
  }
dengyihao's avatar
dengyihao 已提交
374 375
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
376 377 378 379
  if (cliRecvReleaseReq(conn, pHead)) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
380 381 382 383 384
  STransMsg transMsg = {0};
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = transContFromHead((char*)pHead);
  transMsg.code = pHead->code;
  transMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
385
  transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
386
  transMsg.info.traceId = pHead->traceId;
dengyihao's avatar
dengyihao 已提交
387
  transMsg.info.hasEpSet = pHead->hasEpSet;
dengyihao's avatar
dengyihao 已提交
388

dengyihao's avatar
dengyihao 已提交
389 390 391 392
  SCliMsg*       pMsg = NULL;
  STransConnCtx* pCtx = NULL;
  if (CONN_NO_PERSIST_BY_APP(conn)) {
    pMsg = transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
393 394 395

    pCtx = pMsg ? pMsg->ctx : NULL;
    transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
396
    tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
U
ubuntu 已提交
397
  } else {
dengyihao's avatar
dengyihao 已提交
398 399 400
    uint64_t ahandle = (uint64_t)pHead->ahandle;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
    if (pMsg == NULL) {
S
Shengliang Guan 已提交
401
      transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
402 403
      tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
             transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
404
      if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
D
dapan1121 已提交
405
        transMsg.code = TSDB_CODE_RPC_BROKEN_LINK;
S
Shengliang Guan 已提交
406
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
dengyihao's avatar
dengyihao 已提交
407 408
        tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
409 410
      }
    } else {
dengyihao's avatar
dengyihao 已提交
411
      pCtx = pMsg->ctx;
S
Shengliang Guan 已提交
412
      transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
413
      tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
414
    }
U
ubuntu 已提交
415
  }
dengyihao's avatar
dengyihao 已提交
416
  // buf's mem alread translated to transMsg.pCont
dengyihao's avatar
dengyihao 已提交
417
  if (!CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
418
    transMsg.info.handle = (void*)conn->refId;
dengyihao's avatar
dengyihao 已提交
419
    tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
420
  }
dengyihao's avatar
dengyihao 已提交
421

dengyihao's avatar
dengyihao 已提交
422
  STraceId* trace = &transMsg.info.traceId;
dengyihao's avatar
dengyihao 已提交
423
  tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
dengyihao's avatar
dengyihao 已提交
424
          TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
dengyihao's avatar
dengyihao 已提交
425

dengyihao's avatar
dengyihao 已提交
426
  if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
427
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
428
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
429 430
    return;
  }
S
Shengliang Guan 已提交
431
  if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
432
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
433
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
434 435
    return;
  }
dengyihao's avatar
dengyihao 已提交
436

dengyihao's avatar
dengyihao 已提交
437
  if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
438 439 440
    if (cliAppCb(conn, &transMsg, pMsg) != 0) {
      return;
    }
dengyihao's avatar
dengyihao 已提交
441
  }
dengyihao's avatar
dengyihao 已提交
442 443
  destroyCmsg(pMsg);

dengyihao's avatar
dengyihao 已提交
444
  if (cliMaySendCachedMsg(conn) == true) {
dengyihao's avatar
dengyihao 已提交
445 446
    return;
  }
dengyihao's avatar
dengyihao 已提交
447

U
ubuntu 已提交
448
  if (CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
449
    return addConnToPool(pThrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
450
  }
dengyihao's avatar
test  
dengyihao 已提交
451

dengyihao's avatar
dengyihao 已提交
452
  uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
453
}
U
ubuntu 已提交
454

dengyihao's avatar
dengyihao 已提交
455
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
dengyihao's avatar
dengyihao 已提交
456
  if (transQueueEmpty(&pConn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
457
    if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
dengyihao's avatar
dengyihao 已提交
458
      tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
formate  
dengyihao 已提交
459 460 461
      transUnrefCliHandle(pConn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
462
  }
dengyihao's avatar
dengyihao 已提交
463 464 465
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
  bool      once = false;
D
dapan1121 已提交
466
  do {
dengyihao's avatar
dengyihao 已提交
467
    SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
dengyihao's avatar
enh log  
dengyihao 已提交
468

D
dapan1121 已提交
469 470
    if (pMsg == NULL && once) {
      break;
dengyihao's avatar
dengyihao 已提交
471
    }
dengyihao's avatar
enh log  
dengyihao 已提交
472 473 474 475 476 477

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg)) {
      destroyCmsg(pMsg);
      break;
    }

dengyihao's avatar
dengyihao 已提交
478
    STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
U
ubuntu 已提交
479

dengyihao's avatar
dengyihao 已提交
480
    STransMsg transMsg = {0};
dengyihao's avatar
dengyihao 已提交
481
    transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
dengyihao's avatar
dengyihao 已提交
482
    transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
S
Shengliang Guan 已提交
483
    transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
484

dengyihao's avatar
dengyihao 已提交
485
    if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
S
Shengliang Guan 已提交
486
      transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
487
      tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
dengyihao's avatar
dengyihao 已提交
488
             TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
489
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
490 491 492
        int32_t msgType = 0;
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType);
        transMsg.msgType = msgType;
dengyihao's avatar
dengyihao 已提交
493
        tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
S
Shengliang Guan 已提交
494
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
495
      }
dengyihao's avatar
dengyihao 已提交
496
    } else {
dengyihao's avatar
dengyihao 已提交
497
      transMsg.info.ahandle = (pMsg != NULL && pMsg->type != Release && pCtx) ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
498 499 500
    }

    if (pCtx == NULL || pCtx->pSem == NULL) {
S
Shengliang Guan 已提交
501
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
502 503 504 505 506
        if (pMsg == NULL || REQUEST_NO_RESP(&pMsg->msg) || pMsg->type == Release) {
          destroyCmsg(pMsg);
          once = true;
          continue;
        }
U
ubuntu 已提交
507
      }
dengyihao's avatar
dengyihao 已提交
508
    }
dengyihao's avatar
enh log  
dengyihao 已提交
509

dengyihao's avatar
dengyihao 已提交
510
    if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
511 512 513
      if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
        return;
      }
dengyihao's avatar
dengyihao 已提交
514 515
    }
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
516
    tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn));
D
dapan1121 已提交
517
  } while (!transQueueEmpty(&pConn->cliMsgs));
dengyihao's avatar
dengyihao 已提交
518
  transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
519
}
dengyihao's avatar
dengyihao 已提交
520
void cliHandleExcept(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
521
  tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
522 523
  cliHandleExceptImpl(conn, -1);
}
dengyihao's avatar
dengyihao 已提交
524

dengyihao's avatar
dengyihao 已提交
525 526
void cliConnTimeout(uv_timer_t* handle) {
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
527 528
  SCliThrd* pThrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
529
  tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
530

dengyihao's avatar
dengyihao 已提交
531
  uv_timer_stop(handle);
dengyihao's avatar
dengyihao 已提交
532 533 534
  handle->data = NULL;
  taosArrayPush(pThrd->timerList, &conn->timer);
  conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
535 536

  cliHandleFastFail(conn, UV_ECANCELED);
dengyihao's avatar
dengyihao 已提交
537
}
dengyihao's avatar
dengyihao 已提交
538 539 540 541
void cliReadTimeoutCb(uv_timer_t* handle) {
  // set up timeout cb
  SCliConn* conn = handle->data;
  tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
542
  uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
543 544
  cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
}
U
ubuntu 已提交
545 546

void* createConnPool(int size) {
547 548
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
549
}
U
ubuntu 已提交
550
void* destroyConnPool(void* pool) {
dengyihao's avatar
dengyihao 已提交
551
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
552
  SCliThrd*  pThrd = connList->pThrd;
dengyihao's avatar
dengyihao 已提交
553
  while (connList != NULL) {
dengyihao's avatar
dengyihao 已提交
554 555
    while (!QUEUE_IS_EMPTY(&connList->conns)) {
      queue*    h = QUEUE_HEAD(&connList->conns);
dengyihao's avatar
dengyihao 已提交
556
      SCliConn* c = QUEUE_DATA(h, SCliConn, q);
U
ubuntu 已提交
557
      cliDestroyConn(c, true);
dengyihao's avatar
dengyihao 已提交
558
    }
dengyihao's avatar
dengyihao 已提交
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573

    SMsgList* msglist = connList->list;
    while (!QUEUE_IS_EMPTY(&msglist->msgQ)) {
      queue* h = QUEUE_HEAD(&msglist->msgQ);
      QUEUE_REMOVE(h);

      SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);

      transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
      pMsg->ctx->task = NULL;

      doNotifyApp(pMsg, pThrd);
    }
    taosMemoryFree(msglist);

dengyihao's avatar
dengyihao 已提交
574
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
575
  }
dengyihao's avatar
dengyihao 已提交
576
  taosHashCleanup(pool);
577
  return NULL;
dengyihao's avatar
dengyihao 已提交
578 579
}

dengyihao's avatar
dengyihao 已提交
580 581
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key) {
  void*      pool = pThrd->pool;
dengyihao's avatar
dengyihao 已提交
582
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
583
  STrans*    pTranInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
584
  if (plist == NULL) {
dengyihao's avatar
dengyihao 已提交
585
    SConnList list = {0};
dengyihao's avatar
dengyihao 已提交
586 587
    taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
588
    if (plist == NULL) return NULL;
dengyihao's avatar
dengyihao 已提交
589
    QUEUE_INIT(&plist->conns);
dengyihao's avatar
dengyihao 已提交
590 591
  }

dengyihao's avatar
dengyihao 已提交
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626
  SMsgList* msglist = plist->list;
  if (QUEUE_IS_EMPTY(&plist->conns) && msglist->numOfConn >= pTranInst->connLimitNum) {
    return NULL;
  }

  plist->size -= 1;
  queue*    h = QUEUE_HEAD(&plist->conns);
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
  conn->status = ConnNormal;
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);

  if (conn->task != NULL) {
    transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
    conn->task = NULL;
  }
  return conn;
}

static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
  void*      pool = pThrd->pool;
  STrans*    pTransInst = pThrd->pTransInst;
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
  if (plist == NULL) {
    SConnList list = {0};

    SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
    nList->numOfConn++;
    QUEUE_INIT(&nList->msgQ);
    list.list = nList;

    taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));

    plist = taosHashGet((SHashObj*)pool, key, strlen(key));
    QUEUE_INIT(&plist->conns);
dengyihao's avatar
dengyihao 已提交
627
    return NULL;
dengyihao's avatar
dengyihao 已提交
628 629 630 631
  }

  SMsgList* list = plist->list;
  // no avaliable conn in pool
dengyihao's avatar
dengyihao 已提交
632
  if (QUEUE_IS_EMPTY(&plist->conns)) {
dengyihao's avatar
dengyihao 已提交
633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662
    if ((list)->numOfConn >= pTransInst->connLimitNum) {
      STraceId* trace = &(*pMsg)->msg.info.traceId;

      STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
      arg->param1 = *pMsg;
      arg->param2 = pThrd;
      (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);

      tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));

      QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
      *pMsg = NULL;
    } else {
      // send msg in delay queue
      if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
        STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
        arg->param1 = *pMsg;
        arg->param2 = pThrd;
        (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);

        QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
        queue* h = QUEUE_HEAD(&(list)->msgQ);
        QUEUE_REMOVE(h);
        SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q);

        *pMsg = ans;
        transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
      }
      list->numOfConn++;
    }
dengyihao's avatar
dengyihao 已提交
663 664
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
665 666

  plist->size -= 1;
dengyihao's avatar
dengyihao 已提交
667
  queue*    h = QUEUE_HEAD(&plist->conns);
dengyihao's avatar
dengyihao 已提交
668
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
dengyihao's avatar
dengyihao 已提交
669
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
670 671
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
672

dengyihao's avatar
dengyihao 已提交
673 674 675 676
  if (conn->task != NULL) {
    transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
    conn->task = NULL;
  }
dengyihao's avatar
dengyihao 已提交
677
  return conn;
dengyihao's avatar
dengyihao 已提交
678
}
dengyihao's avatar
dengyihao 已提交
679 680 681 682 683 684
static void addConnToPool(void* pool, SCliConn* conn) {
  if (conn->status == ConnInPool) {
    return;
  }
  allocConnRef(conn, true);

dengyihao's avatar
dengyihao 已提交
685
  SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
686 687 688 689 690 691
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    taosArrayPush(thrd->timerList, &conn->timer);
    conn->timer->data = NULL;
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
692 693 694 695 696
  if (T_REF_VAL_GET(conn) > 1) {
    transUnrefCliHandle(conn);
  }

  cliDestroyConnMsgs(conn, false);
dengyihao's avatar
dengyihao 已提交
697

dengyihao's avatar
dengyihao 已提交
698
  if (conn->list == NULL) {
dengyihao's avatar
dengyihao 已提交
699
    conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
dengyihao's avatar
dengyihao 已提交
700
  }
dengyihao's avatar
dengyihao 已提交
701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721

  SConnList* pList = conn->list;
  SMsgList*  msgList = pList->list;
  if (!QUEUE_IS_EMPTY(&msgList->msgQ)) {
    queue* h = QUEUE_HEAD(&(msgList)->msgQ);
    QUEUE_REMOVE(h);

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);

    transDQCancel(thrd->waitConnQueue, pMsg->ctx->task);
    pMsg->ctx->task = NULL;

    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
    transQueuePush(&conn->cliMsgs, pMsg);

    conn->status = ConnNormal;
    cliSend(conn);
    return;
  }

  conn->status = ConnInPool;
dengyihao's avatar
dengyihao 已提交
722
  QUEUE_PUSH(&conn->list->conns, &conn->q);
dengyihao's avatar
dengyihao 已提交
723
  conn->list->size += 1;
dengyihao's avatar
dengyihao 已提交
724

dengyihao's avatar
dengyihao 已提交
725
  if (conn->list->size >= 250) {
dengyihao's avatar
dengyihao 已提交
726 727
    STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
    arg->param1 = conn;
dengyihao's avatar
dengyihao 已提交
728
    arg->param2 = thrd;
dengyihao's avatar
dengyihao 已提交
729 730

    STrans* pTransInst = thrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
731 732
    conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
  }
dengyihao's avatar
dengyihao 已提交
733
}
dengyihao's avatar
dengyihao 已提交
734
static int32_t allocConnRef(SCliConn* conn, bool update) {
dengyihao's avatar
dengyihao 已提交
735
  if (update) {
dengyihao's avatar
dengyihao 已提交
736
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
737
    transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
738
    conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
739 740 741 742
  }
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
743
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
dengyihao's avatar
dengyihao 已提交
744
  conn->refId = exh->refId;
745 746 747 748

  if (conn->refId == -1) {
    taosMemoryFree(exh);
  }
dengyihao's avatar
dengyihao 已提交
749 750 751 752 753
  return 0;
}

static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
  if (update) {
dengyihao's avatar
dengyihao 已提交
754
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
755 756 757 758 759 760 761 762 763 764 765 766 767
    transRemoveExHandle(transGetRefMgt(), conn->refId);
    conn->refId = -1;
  }
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
  if (exh == NULL) {
    return -1;
  }
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
  conn->refId = exh->refId;

  transReleaseExHandle(transGetRefMgt(), handle);
  return 0;
dengyihao's avatar
dengyihao 已提交
768
}
dengyihao's avatar
dengyihao 已提交
769

dengyihao's avatar
dengyihao 已提交
770
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
771
  SCliConn*    conn = handle->data;
dengyihao's avatar
dengyihao 已提交
772
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
773
  tDebug("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
774
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
775
}
U
ubuntu 已提交
776
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
777
  // impl later
dengyihao's avatar
dengyihao 已提交
778 779 780
  if (handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
781 782
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
783
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
784
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
785
    while (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
786
      tDebug("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
787 788 789 790 791 792
      if (pBuf->invalid) {
        cliHandleExcept(conn);
        break;
      } else {
        cliHandleResp(conn);
      }
dengyihao's avatar
dengyihao 已提交
793
    }
dengyihao's avatar
dengyihao 已提交
794 795
    return;
  }
dengyihao's avatar
dengyihao 已提交
796

797
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
798 799 800
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
dengyihao's avatar
dengyihao 已提交
801
    tTrace("%s conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
802 803
    return;
  }
dengyihao's avatar
fix bug  
dengyihao 已提交
804
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
805
    tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
806
    conn->broken = true;
U
ubuntu 已提交
807
    cliHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
808
  }
dengyihao's avatar
dengyihao 已提交
809
}
dengyihao's avatar
dengyihao 已提交
810

dengyihao's avatar
dengyihao 已提交
811
static SCliConn* cliCreateConn(SCliThrd* pThrd) {
wafwerar's avatar
wafwerar 已提交
812
  SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
813
  // read/write stream handle
G
gccgdb1234 已提交
814
  conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
815 816 817
  uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
  conn->stream->data = conn;

dengyihao's avatar
dengyihao 已提交
818 819 820 821 822 823 824 825
  uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
  if (timer == NULL) {
    timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    tDebug("no available timer, create a timer %p", timer);
    uv_timer_init(pThrd->loop, timer);
  }
  timer->data = conn;
  conn->timer = timer;
dengyihao's avatar
dengyihao 已提交
826

dengyihao's avatar
dengyihao 已提交
827
  conn->connReq.data = conn;
dengyihao's avatar
dengyihao 已提交
828 829
  transReqQueueInit(&conn->wreqQueue);

dengyihao's avatar
dengyihao 已提交
830
  transQueueInit(&conn->cliMsgs, NULL);
dengyihao's avatar
opt rpc  
dengyihao 已提交
831 832

  transInitBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
833
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
834
  conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
835
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
836
  conn->broken = false;
dengyihao's avatar
dengyihao 已提交
837
  transRefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
838

dengyihao's avatar
dengyihao 已提交
839
  atomic_add_fetch_32(&pThrd->connCount, 1);
dengyihao's avatar
dengyihao 已提交
840
  allocConnRef(conn, false);
dengyihao's avatar
dengyihao 已提交
841

dengyihao's avatar
dengyihao 已提交
842 843
  return conn;
}
U
ubuntu 已提交
844
static void cliDestroyConn(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
845
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
846
  tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
847

dengyihao's avatar
dengyihao 已提交
848 849
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
850 851 852 853 854 855 856 857 858 859

  if (conn->list != NULL) {
    SConnList* connList = conn->list;
    connList->list->numOfConn--;
  } else {
    SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
    connList->list->numOfConn--;
  }
  conn->list = NULL;

dengyihao's avatar
dengyihao 已提交
860
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
861
  transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
opt rpc  
dengyihao 已提交
862
  conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
863

dengyihao's avatar
dengyihao 已提交
864 865 866 867 868 869 870
  if (conn->task != NULL) {
    transDQCancel(pThrd->timeoutQueue, conn->task);
    conn->task = NULL;
  }
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
871
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
872 873
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
874

dengyihao's avatar
dengyihao 已提交
875
  if (clear) {
dengyihao's avatar
dengyihao 已提交
876
    if (!uv_is_closing((uv_handle_t*)conn->stream)) {
dengyihao's avatar
dengyihao 已提交
877
      uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
878 879
      uv_close((uv_handle_t*)conn->stream, cliDestroy);
    }
880
  }
dengyihao's avatar
dengyihao 已提交
881
}
U
ubuntu 已提交
882
static void cliDestroy(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
883 884 885
  if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
886
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
887
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
888 889
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
dengyihao's avatar
dengyihao 已提交
890 891
    taosArrayPush(pThrd->timerList, &conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
892 893
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
894

dengyihao's avatar
dengyihao 已提交
895 896
  atomic_sub_fetch_32(&pThrd->connCount, 1);

dengyihao's avatar
dengyihao 已提交
897
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
898
  transRemoveExHandle(transGetRefMgt(), conn->refId);
wafwerar's avatar
wafwerar 已提交
899 900
  taosMemoryFree(conn->ip);
  taosMemoryFree(conn->stream);
dengyihao's avatar
dengyihao 已提交
901 902 903

  cliDestroyConnMsgs(conn, true);

dengyihao's avatar
dengyihao 已提交
904
  tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
905
  transReqQueueClear(&conn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
906
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
907

wafwerar's avatar
wafwerar 已提交
908
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
909
}
dengyihao's avatar
dengyihao 已提交
910
static bool cliHandleNoResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
911 912
  bool res = false;
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
913
    SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0);
dengyihao's avatar
dengyihao 已提交
914
    if (REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
915
      transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
916 917 918 919 920
      destroyCmsg(pMsg);
      res = true;
    }
    if (res == true) {
      if (cliMaySendCachedMsg(conn) == false) {
dengyihao's avatar
dengyihao 已提交
921
        SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
922
        addConnToPool(thrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
923 924 925
        res = false;
      } else {
        res = true;
dengyihao's avatar
dengyihao 已提交
926 927 928 929 930
      }
    }
  }
  return res;
}
U
ubuntu 已提交
931
static void cliSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
932 933
  SCliConn* pConn = transReqQueueRemove(req);
  if (pConn == NULL) return;
dengyihao's avatar
dengyihao 已提交
934

dengyihao's avatar
dengyihao 已提交
935 936 937
  SCliMsg* pMsg = !transQueueEmpty(&pConn->cliMsgs) ? transQueueGet(&pConn->cliMsgs, 0) : NULL;
  if (pMsg != NULL) {
    int64_t cost = taosGetTimestampUs() - pMsg->st;
dengyihao's avatar
dengyihao 已提交
938
    if (cost > 1000 * 20) {
dengyihao's avatar
dengyihao 已提交
939 940 941 942
      tWarn("%s conn %p send cost:%dus, send exception", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
    }
  }

dengyihao's avatar
dengyihao 已提交
943
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
944
    tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
945
  } else {
dengyihao's avatar
dengyihao 已提交
946 947 948 949
    if (!uv_is_closing((uv_handle_t*)&pConn->stream)) {
      tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
      cliHandleExcept(pConn);
    }
dengyihao's avatar
dengyihao 已提交
950 951
    return;
  }
dengyihao's avatar
dengyihao 已提交
952
  if (cliHandleNoResp(pConn) == true) {
dengyihao's avatar
dengyihao 已提交
953
    tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
954 955
    return;
  }
dengyihao's avatar
dengyihao 已提交
956
  uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
957
}
dengyihao's avatar
dengyihao 已提交
958 959 960 961
void cliSendBatch(SCliConn* pConn) {
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

dengyihao's avatar
dengyihao 已提交
962 963 964 965 966
  SCliBatch*     pBatch = pConn->pBatch;
  SCliBatchList* pList = pBatch->pList;
  pList->connCnt += 1;

  int32_t wLen = pBatch->wLen;
dengyihao's avatar
dengyihao 已提交
967 968 969 970

  uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
  int       i = 0;

dengyihao's avatar
dengyihao 已提交
971 972
  queue* h = NULL;
  QUEUE_FOREACH(h, &pBatch->wq) {
dengyihao's avatar
dengyihao 已提交
973 974 975 976 977 978 979 980 981
    SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q);

    STransConnCtx* pCtx = pCliMsg->ctx;

    STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
    if (pMsg->pCont == 0) {
      pMsg->pCont = (void*)rpcMallocCont(0);
      pMsg->contLen = 0;
    }
dengyihao's avatar
dengyihao 已提交
982

dengyihao's avatar
dengyihao 已提交
983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998
    int            msgLen = transMsgLenFromCont(pMsg->contLen);
    STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);

    if (pHead->comp == 0) {
      pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
      pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
      pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
      pHead->msgType = pMsg->msgType;
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
      memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
      pHead->traceId = pMsg->info.traceId;
      pHead->magicNum = htonl(TRANS_MAGIC_NUM);
    }
    pHead->timestamp = taosHton64(taosGetTimestampUs());

dengyihao's avatar
dengyihao 已提交
999 1000 1001 1002 1003 1004 1005 1006
    if (pHead->comp == 0) {
      if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
        msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
        pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      }
    } else {
      msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
    }
dengyihao's avatar
dengyihao 已提交
1007 1008 1009 1010 1011
    wb[i++] = uv_buf_init((char*)pHead, msgLen);
  }

  uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
  req->data = pConn;
dengyihao's avatar
dengyihao 已提交
1012
  tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn,
dengyihao's avatar
dengyihao 已提交
1013
         pBatch->wLen, pBatch->batchSize);
dengyihao's avatar
dengyihao 已提交
1014 1015 1016
  uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
  taosMemoryFree(wb);
}
U
ubuntu 已提交
1017
void cliSend(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
1018 1019 1020 1021 1022
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  if (transQueueEmpty(&pConn->cliMsgs)) {
    tError("%s conn %p not msg to send", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
1023
    cliHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
1024 1025
    return;
  }
dengyihao's avatar
dengyihao 已提交
1026 1027

  SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
1028
  CONN_GET_NEXT_SENDMSG(pConn);
dengyihao's avatar
dengyihao 已提交
1029 1030
  pCliMsg->sent = 1;

dengyihao's avatar
dengyihao 已提交
1031
  STransConnCtx* pCtx = pCliMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1032

U
ubuntu 已提交
1033
  STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
dengyihao's avatar
dengyihao 已提交
1034 1035 1036 1037
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
dengyihao's avatar
dengyihao 已提交
1038

dengyihao's avatar
dengyihao 已提交
1039
  int            msgLen = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
1040
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
1041

dengyihao's avatar
dengyihao 已提交
1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
  if (pHead->comp == 0) {
    pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
    pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
    pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
    pHead->msgType = pMsg->msgType;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
    memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
    pHead->traceId = pMsg->info.traceId;
    pHead->magicNum = htonl(TRANS_MAGIC_NUM);
  }
dengyihao's avatar
dengyihao 已提交
1053
  pHead->timestamp = taosHton64(taosGetTimestampUs());
dengyihao's avatar
dengyihao 已提交
1054

dengyihao's avatar
dengyihao 已提交
1055 1056 1057
  if (pHead->persist == 1) {
    CONN_SET_PERSIST_BY_APP(pConn);
  }
dengyihao's avatar
dengyihao 已提交
1058

dengyihao's avatar
dengyihao 已提交
1059
  STraceId* trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
1060

dengyihao's avatar
dengyihao 已提交
1061
  if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
dengyihao's avatar
dengyihao 已提交
1062
    uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
dengyihao's avatar
dengyihao 已提交
1063 1064
    if (timer == NULL) {
      timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
dengyihao's avatar
dengyihao 已提交
1065
      tDebug("no available timer, create a timer %p", timer);
dengyihao's avatar
dengyihao 已提交
1066 1067 1068 1069 1070
      uv_timer_init(pThrd->loop, timer);
    }
    timer->data = pConn;
    pConn->timer = timer;

dengyihao's avatar
dengyihao 已提交
1071 1072 1073
    tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
    uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
  }
dengyihao's avatar
dengyihao 已提交
1074

dengyihao's avatar
dengyihao 已提交
1075 1076 1077 1078 1079 1080 1081
  if (pHead->comp == 0) {
    if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
      msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    }
  } else {
    msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
dengyihao's avatar
dengyihao 已提交
1082
  }
dengyihao's avatar
dengyihao 已提交
1083

dengyihao's avatar
dengyihao 已提交
1084 1085
  tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
          TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
dengyihao's avatar
dengyihao 已提交
1086

dengyihao's avatar
dengyihao 已提交
1087
  uv_buf_t    wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
1088
  uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
1089 1090 1091

  int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1092
    tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
dengyihao's avatar
dengyihao 已提交
1093 1094 1095
            uv_err_name(status));
    cliHandleExcept(pConn);
  }
U
ubuntu 已提交
1096 1097
  return;
_RETURN:
dengyihao's avatar
dengyihao 已提交
1098
  return;
dengyihao's avatar
dengyihao 已提交
1099
}
dengyihao's avatar
dengyihao 已提交
1100 1101

static void cliDestroyBatch(SCliBatch* pBatch) {
dengyihao's avatar
dengyihao 已提交
1102
  if (pBatch == NULL) return;
dengyihao's avatar
dengyihao 已提交
1103
  while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
1104 1105
    queue* h = QUEUE_HEAD(&pBatch->wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
1106

dengyihao's avatar
dengyihao 已提交
1107
    SCliMsg* p = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1108 1109
    destroyCmsg(p);
  }
dengyihao's avatar
dengyihao 已提交
1110 1111
  SCliBatchList* p = pBatch->pList;
  p->sending -= 1;
dengyihao's avatar
dengyihao 已提交
1112 1113
  taosMemoryFree(pBatch);
}
dengyihao's avatar
dengyihao 已提交
1114
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1115 1116 1117 1118 1119
  if (pThrd->quit == true) {
    cliDestroyBatch(pBatch);
    return;
  }

dengyihao's avatar
dengyihao 已提交
1120
  if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
1121 1122
    return;
  }
dengyihao's avatar
dengyihao 已提交
1123 1124
  STrans*        pTransInst = pThrd->pTransInst;
  SCliBatchList* pList = pBatch->pList;
dengyihao's avatar
dengyihao 已提交
1125

dengyihao's avatar
dengyihao 已提交
1126 1127 1128
  char key[TSDB_FQDN_LEN + 64] = {0};
  CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port);

dengyihao's avatar
dengyihao 已提交
1129
  SCliConn* conn = getConnFromPool(pThrd, key);
dengyihao's avatar
dengyihao 已提交
1130

dengyihao's avatar
dengyihao 已提交
1131
  if (conn == NULL) {
dengyihao's avatar
dengyihao 已提交
1132 1133 1134
    tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen,
           pBatch->batchSize);
    cliDestroyBatch(pBatch);
dengyihao's avatar
dengyihao 已提交
1135 1136
    return;
  }
dengyihao's avatar
dengyihao 已提交
1137 1138
  if (conn == NULL) {
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
1139 1140
    conn->pBatch = pBatch;
    conn->ip = strdup(pList->dst);
dengyihao's avatar
dengyihao 已提交
1141

dengyihao's avatar
dengyihao 已提交
1142
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip);
dengyihao's avatar
dengyihao 已提交
1143 1144 1145 1146 1147 1148
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1149
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1150 1151 1152 1153 1154
      return;
    }
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1155
    addr.sin_port = (uint16_t)htons(pList->port);
dengyihao's avatar
dengyihao 已提交
1156

dengyihao's avatar
dengyihao 已提交
1157
    tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst);
dengyihao's avatar
dengyihao 已提交
1158 1159
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
    if (fd == -1) {
dengyihao's avatar
dengyihao 已提交
1160 1161 1162
      tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
             tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1163 1164 1165 1166
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1167 1168
      tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1169 1170 1171 1172
      return;
    }
    ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1173 1174
      tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1175 1176 1177 1178 1179 1180 1181 1182 1183
      return;
    }

    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
    if (ret != 0) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
1184
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1185 1186 1187 1188 1189 1190
      return;
    }
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
    return;
  }

dengyihao's avatar
dengyihao 已提交
1191
  conn->pBatch = pBatch;
dengyihao's avatar
dengyihao 已提交
1192
  cliSendBatch(conn);
dengyihao's avatar
dengyihao 已提交
1193 1194
}
static void cliSendBatchCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
1195 1196 1197
  SCliConn*  conn = req->data;
  SCliThrd*  thrd = conn->hostThrd;
  SCliBatch* p = conn->pBatch;
dengyihao's avatar
dengyihao 已提交
1198

dengyihao's avatar
dengyihao 已提交
1199
  SCliBatchList* pBatchList = p->pList;
dengyihao's avatar
dengyihao 已提交
1200
  SCliBatch*     nxtBatch = cliGetHeadFromList(pBatchList);
dengyihao's avatar
dengyihao 已提交
1201 1202
  pBatchList->connCnt -= 1;

dengyihao's avatar
dengyihao 已提交
1203 1204 1205
  conn->pBatch = NULL;

  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1206
    tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
dengyihao's avatar
dengyihao 已提交
1207
           p->wLen, p->batchSize, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
1208 1209 1210

    if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn);

dengyihao's avatar
dengyihao 已提交
1211
    cliHandleBatchReq(nxtBatch, thrd);
dengyihao's avatar
dengyihao 已提交
1212
  } else {
dengyihao's avatar
dengyihao 已提交
1213
    tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
dengyihao's avatar
dengyihao 已提交
1214
           p->batchSize);
dengyihao's avatar
dengyihao 已提交
1215
    if (!uv_is_closing((uv_handle_t*)&conn->stream)) {
dengyihao's avatar
dengyihao 已提交
1216 1217 1218 1219 1220 1221
      if (nxtBatch != NULL) {
        conn->pBatch = nxtBatch;
        cliSendBatch(conn);
      } else {
        addConnToPool(thrd->pool, conn);
      }
dengyihao's avatar
dengyihao 已提交
1222
    } else {
dengyihao's avatar
dengyihao 已提交
1223 1224
      cliDestroyBatch(nxtBatch);
      // conn release by other callback
dengyihao's avatar
dengyihao 已提交
1225
    }
dengyihao's avatar
dengyihao 已提交
1226
  }
dengyihao's avatar
dengyihao 已提交
1227 1228 1229

  cliDestroyBatch(p);
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
1230
}
dengyihao's avatar
dengyihao 已提交
1231
static void cliHandleFastFail(SCliConn* pConn, int status) {
dengyihao's avatar
dengyihao 已提交
1232
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1233
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1234 1235 1236

  if (status == -1) status = ENETUNREACH;

dengyihao's avatar
dengyihao 已提交
1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255
  if (pConn->pBatch == NULL) {
    SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);

    STraceId* trace = &pMsg->msg.info.traceId;
    tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
            TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status));

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
        (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
      SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip));
      int64_t        cTimestamp = taosGetTimestampMs();
      if (item != NULL) {
        int32_t elapse = cTimestamp - item->timestamp;
        if (elapse >= 0 && elapse <= pTransInst->failFastInterval) {
          item->count++;
        } else {
          item->count = 1;
          item->timestamp = cTimestamp;
        }
dengyihao's avatar
dengyihao 已提交
1256
      } else {
dengyihao's avatar
dengyihao 已提交
1257 1258
        SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
        taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem));
dengyihao's avatar
dengyihao 已提交
1259 1260
      }
    }
dengyihao's avatar
dengyihao 已提交
1261
  } else {
dengyihao's avatar
dengyihao 已提交
1262 1263
    tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
           pConn, pConn->ip, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
1264 1265
    cliDestroyBatch(pConn->pBatch);
    pConn->pBatch = NULL;
dengyihao's avatar
dengyihao 已提交
1266 1267 1268
  }
  cliHandleExcept(pConn);
}
dengyihao's avatar
dengyihao 已提交
1269

dengyihao's avatar
dengyihao 已提交
1270 1271 1272
void cliConnCb(uv_connect_t* req, int status) {
  SCliConn* pConn = req->data;
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1273 1274 1275 1276 1277 1278 1279 1280 1281 1282
  bool      timeout = false;

  if (pConn->timer == NULL) {
    timeout = true;
  } else {
    uv_timer_stop(pConn->timer);
    pConn->timer->data = NULL;
    taosArrayPush(pThrd->timerList, &pConn->timer);
    pConn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
1283 1284

  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1285 1286 1287 1288 1289
    if (timeout == false) {
      cliHandleFastFail(pConn, status);
    } else if (timeout == true) {
      // already deal by timeout
    }
1290
    return;
dengyihao's avatar
dengyihao 已提交
1291
  }
dengyihao's avatar
dengyihao 已提交
1292

dengyihao's avatar
dengyihao 已提交
1293 1294
  struct sockaddr peername, sockname;
  int             addrlen = sizeof(peername);
dengyihao's avatar
dengyihao 已提交
1295
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
dengyihao's avatar
dengyihao 已提交
1296
  transSockInfo2Str(&peername, pConn->dst);
dengyihao's avatar
dengyihao 已提交
1297

dengyihao's avatar
dengyihao 已提交
1298 1299
  addrlen = sizeof(sockname);
  uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
dengyihao's avatar
dengyihao 已提交
1300
  transSockInfo2Str(&sockname, pConn->src);
dengyihao's avatar
dengyihao 已提交
1301

dengyihao's avatar
dengyihao 已提交
1302
  tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
1303 1304 1305 1306 1307
  if (pConn->pBatch != NULL) {
    cliSendBatch(pConn);
  } else {
    cliSend(pConn);
  }
dengyihao's avatar
dengyihao 已提交
1308 1309
}

dengyihao's avatar
dengyihao 已提交
1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
  STransConnCtx* pCtx = pMsg->ctx;
  STrans*        pTransInst = pThrd->pTransInst;

  STransMsg transMsg = {0};
  transMsg.contLen = 0;
  transMsg.pCont = NULL;
  transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS;
  transMsg.msgType = pMsg->msg.msgType + 1;
  transMsg.info.ahandle = pMsg->ctx->ahandle;
  transMsg.info.traceId = pMsg->msg.info.traceId;
  transMsg.info.hasEpSet = false;
  if (pCtx->pSem != NULL) {
    if (pCtx->pRsp == NULL) {
    } else {
      memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
    }
  } else {
    pTransInst->cfp(pTransInst->parent, &transMsg, NULL);
  }

  destroyCmsg(pMsg);
}
dengyihao's avatar
dengyihao 已提交
1333
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1334 1335 1336 1337 1338
  if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) {
    pThrd->stopMsg = pMsg;
    return;
  }
  pThrd->stopMsg = NULL;
dengyihao's avatar
dengyihao 已提交
1339
  pThrd->quit = true;
U
ubuntu 已提交
1340
  tDebug("cli work thread %p start to quit", pThrd);
dengyihao's avatar
dengyihao 已提交
1341
  destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1342

dengyihao's avatar
fix bug  
dengyihao 已提交
1343
  destroyConnPool(pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1344
  uv_walk(pThrd->loop, cliWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
1345
}
dengyihao's avatar
dengyihao 已提交
1346
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1347
  int64_t    refId = (int64_t)(pMsg->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1348
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1349
  if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1350
    tDebug("%" PRId64 " already released", refId);
dengyihao's avatar
dengyihao 已提交
1351 1352
    destroyCmsg(pMsg);
    return;
dengyihao's avatar
dengyihao 已提交
1353 1354 1355
  }

  SCliConn* conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1356
  transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1357
  tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
1358

dengyihao's avatar
dengyihao 已提交
1359 1360
  if (T_REF_VAL_GET(conn) == 2) {
    transUnrefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
1361 1362
    if (!transQueuePush(&conn->cliMsgs, pMsg)) {
      return;
dengyihao's avatar
dengyihao 已提交
1363 1364
    }
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1365 1366 1367
  } else {
    tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1368 1369
  }
}
dengyihao's avatar
dengyihao 已提交
1370
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1371
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1372
  pThrd->cvtAddr = pCtx->cvtAddr;
dengyihao's avatar
dengyihao 已提交
1373 1374
  destroyCmsg(pMsg);
}
U
ubuntu 已提交
1375

dengyihao's avatar
dengyihao 已提交
1376 1377
SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
  STransConnCtx* pCtx = (*pMsg)->ctx;
dengyihao's avatar
dengyihao 已提交
1378 1379
  SCliConn*      conn = NULL;

dengyihao's avatar
dengyihao 已提交
1380
  int64_t refId = (int64_t)((*pMsg)->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1381
  if (refId != 0) {
dengyihao's avatar
dengyihao 已提交
1382
    SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1383
    if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1384
      tError("failed to get conn, refId: %" PRId64 "", refId);
dengyihao's avatar
dengyihao 已提交
1385 1386
      *ignore = true;
      return NULL;
dengyihao's avatar
dengyihao 已提交
1387 1388
    } else {
      conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1389
      if (conn == NULL) {
dengyihao's avatar
dengyihao 已提交
1390
        conn = getConnFromPool2(pThrd, addr, pMsg);
dengyihao's avatar
dengyihao 已提交
1391
        if (conn != NULL) specifyConnRef(conn, true, refId);
dengyihao's avatar
dengyihao 已提交
1392
      }
dengyihao's avatar
dengyihao 已提交
1393
      transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1394 1395 1396
    }
    return conn;
  };
dengyihao's avatar
dengyihao 已提交
1397

dengyihao's avatar
dengyihao 已提交
1398
  conn = getConnFromPool2(pThrd, addr, pMsg);
dengyihao's avatar
dengyihao 已提交
1399
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1400
    tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1401
  } else {
dengyihao's avatar
dengyihao 已提交
1402
    tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1403
  }
dengyihao's avatar
dengyihao 已提交
1404 1405
  return conn;
}
dengyihao's avatar
dengyihao 已提交
1406
FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
dengyihao's avatar
dengyihao 已提交
1407 1408 1409
  if (pCvtAddr->cvt == false) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1410 1411 1412
  if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
    memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN);
    memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
dengyihao's avatar
dengyihao 已提交
1413 1414
  }
}
dengyihao's avatar
dengyihao 已提交
1415

dengyihao's avatar
dengyihao 已提交
1416
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
dengyihao's avatar
dengyihao 已提交
1417
  if (code != 0) return false;
dengyihao's avatar
dengyihao 已提交
1418
  // if (pCtx->retryCnt == 0) return false;
dengyihao's avatar
dengyihao 已提交
1419 1420 1421
  if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
  return true;
}
dengyihao's avatar
dengyihao 已提交
1422
FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
dengyihao's avatar
dengyihao 已提交
1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433
  if (pMsg == NULL) return -1;

  memset(pResp, 0, sizeof(STransMsg));

  pResp->code = TSDB_CODE_RPC_BROKEN_LINK;
  pResp->msgType = pMsg->msg.msgType + 1;
  pResp->info.ahandle = pMsg->ctx ? pMsg->ctx->ahandle : NULL;
  pResp->info.traceId = pMsg->msg.info.traceId;

  return 0;
}
dengyihao's avatar
dengyihao 已提交
1434 1435 1436 1437 1438
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
  uint32_t  addr = 0;
  uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
  if (v == NULL) {
    addr = taosGetIpv4FromFqdn(fqdn);
1439 1440 1441 1442
    if (addr == 0xffffffff) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr());
      return addr;
dengyihao's avatar
dengyihao 已提交
1443 1444
    }

dengyihao's avatar
dengyihao 已提交
1445 1446 1447 1448 1449 1450 1451 1452 1453 1454
    taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
  } else {
    addr = *v;
  }
  return addr;
}
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
  // impl later
  return;
}
dengyihao's avatar
dengyihao 已提交
1455

dengyihao's avatar
dengyihao 已提交
1456 1457 1458 1459 1460
static void doFreeTimeoutMsg(void* param) {
  STaskArg* arg = param;
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1461

dengyihao's avatar
dengyihao 已提交
1462 1463 1464 1465 1466 1467
  QUEUE_REMOVE(&pMsg->q);
  STraceId* trace = &pMsg->msg.info.traceId;
  tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
  doNotifyApp(pMsg, pThrd);
  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
1468
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1469 1470
  STrans*   pTransInst = pThrd->pTransInst;
  STraceId* trace = &pMsg->msg.info.traceId;
dengyihao's avatar
dengyihao 已提交
1471

dengyihao's avatar
dengyihao 已提交
1472 1473
  cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
  if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
1474
    tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
dengyihao's avatar
dengyihao 已提交
1475
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1476 1477
    return;
  }
dengyihao's avatar
dengyihao 已提交
1478

dengyihao's avatar
dengyihao 已提交
1479 1480
  char*    fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
  uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
dengyihao's avatar
dengyihao 已提交
1481 1482
  char     addr[TSDB_FQDN_LEN + 64] = {0};
  CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port);
dengyihao's avatar
dengyihao 已提交
1483

dengyihao's avatar
dengyihao 已提交
1484
  bool      ignore = false;
dengyihao's avatar
dengyihao 已提交
1485
  SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr);
dengyihao's avatar
dengyihao 已提交
1486
  if (ignore == true) {
dengyihao's avatar
dengyihao 已提交
1487
    // persist conn already release by server
dengyihao's avatar
dengyihao 已提交
1488 1489
    STransMsg resp;
    cliBuildExceptResp(pMsg, &resp);
dengyihao's avatar
dengyihao 已提交
1490 1491 1492
    if (pMsg->type != Release) {
      pTransInst->cfp(pTransInst->parent, &resp, NULL);
    }
dengyihao's avatar
dengyihao 已提交
1493
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1494 1495
    return;
  }
dengyihao's avatar
dengyihao 已提交
1496
  if (conn == NULL && pMsg == NULL) {
dengyihao's avatar
dengyihao 已提交
1497 1498 1499
    return;
  }

dengyihao's avatar
dengyihao 已提交
1500
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1501
    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
dengyihao's avatar
dengyihao 已提交
1502
    transQueuePush(&conn->cliMsgs, pMsg);
U
ubuntu 已提交
1503
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1504
  } else {
U
ubuntu 已提交
1505
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
1506 1507 1508 1509

    int64_t refId = (int64_t)pMsg->msg.info.handle;
    if (refId != 0) specifyConnRef(conn, true, refId);

dengyihao's avatar
dengyihao 已提交
1510
    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
dengyihao's avatar
dengyihao 已提交
1511
    transQueuePush(&conn->cliMsgs, pMsg);
dengyihao's avatar
dengyihao 已提交
1512

dengyihao's avatar
dengyihao 已提交
1513
    conn->ip = strdup(addr);
dengyihao's avatar
dengyihao 已提交
1514
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
1515 1516 1517 1518 1519 1520 1521 1522 1523
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1524

dengyihao's avatar
dengyihao 已提交
1525
    struct sockaddr_in addr;
1526
    addr.sin_family = AF_INET;
1527
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1528
    addr.sin_port = (uint16_t)htons(port);
dengyihao's avatar
dengyihao 已提交
1529

dengyihao's avatar
dengyihao 已提交
1530
    tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip);
1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
    if (fd == -1) {
      tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
              tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleExcept(conn);
      errno = 0;
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
      tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
    ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret != 0) {
      tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1551

1552
    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
dengyihao's avatar
dengyihao 已提交
1553
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1554 1555 1556 1557 1558
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1559
      cliHandleFastFail(conn, ret);
dengyihao's avatar
dengyihao 已提交
1560 1561
      return;
    }
dengyihao's avatar
dengyihao 已提交
1562
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
dengyihao's avatar
dengyihao 已提交
1563
  }
dengyihao's avatar
dengyihao 已提交
1564
  tGTrace("%s conn %p ready", pTransInst->label, conn);
dengyihao's avatar
dengyihao 已提交
1565
}
dengyihao's avatar
dengyihao 已提交
1566

dengyihao's avatar
dengyihao 已提交
1567
static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1568
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1569

dengyihao's avatar
dengyihao 已提交
1570 1571
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1572 1573 1574
    QUEUE_REMOVE(h);

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1575 1576 1577 1578 1579

    if (pMsg->type == Quit) {
      pThrd->stopMsg = pMsg;
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1580 1581 1582 1583 1584 1585 1586 1587
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);

    count++;
  }
  if (count >= 2) {
    tTrace("cli process batch size:%d", count);
  }
}
dengyihao's avatar
dengyihao 已提交
1588
SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
dengyihao's avatar
dengyihao 已提交
1589
  if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) {
dengyihao's avatar
dengyihao 已提交
1590 1591 1592 1593
    return NULL;
  }
  queue* hr = QUEUE_HEAD(&pList->wq);
  QUEUE_REMOVE(hr);
dengyihao's avatar
dengyihao 已提交
1594
  pList->sending += 1;
dengyihao's avatar
dengyihao 已提交
1595 1596 1597 1598 1599 1600

  pList->len -= 1;

  SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
  return batch;
}
dengyihao's avatar
dengyihao 已提交
1601

dengyihao's avatar
dengyihao 已提交
1602
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1603 1604
  STrans* pInst = pThrd->pTransInst;

dengyihao's avatar
dengyihao 已提交
1605
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1606 1607
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1608
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
1609 1610

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1611 1612 1613 1614 1615 1616

    if (pMsg->type == Quit) {
      pThrd->stopMsg = pMsg;
      continue;
    }

dengyihao's avatar
dengyihao 已提交
1617
    if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
1618 1619 1620 1621 1622 1623 1624
      STransConnCtx* pCtx = pMsg->ctx;

      char*    ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
      uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
      char     key[TSDB_FQDN_LEN + 64] = {0};
      CONN_CONSTRUCT_HASH_KEY(key, ip, port);

dengyihao's avatar
dengyihao 已提交
1625 1626 1627 1628 1629
      // SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key));
      SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key));
      if (ppBatchList == NULL || *ppBatchList == NULL) {
        SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
        QUEUE_INIT(&pBatchList->wq);
dengyihao's avatar
dengyihao 已提交
1630
        pBatchList->connMax = pInst->connLimitNum;
dengyihao's avatar
dengyihao 已提交
1631
        pBatchList->connCnt = 0;
dengyihao's avatar
dengyihao 已提交
1632
        pBatchList->batchLenLimit = pInst->batchSize;
dengyihao's avatar
dengyihao 已提交
1633
        pBatchList->len += 1;
dengyihao's avatar
dengyihao 已提交
1634

dengyihao's avatar
dengyihao 已提交
1635 1636 1637 1638
        pBatchList->ip = strdup(ip);
        pBatchList->dst = strdup(key);
        pBatchList->port = port;

dengyihao's avatar
dengyihao 已提交
1639 1640
        SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
        QUEUE_INIT(&pBatch->wq);
dengyihao's avatar
dengyihao 已提交
1641 1642
        QUEUE_INIT(&pBatch->listq);

dengyihao's avatar
dengyihao 已提交
1643 1644 1645
        QUEUE_PUSH(&pBatch->wq, h);
        pBatch->wLen += 1;
        pBatch->batchSize += pMsg->msg.contLen;
dengyihao's avatar
dengyihao 已提交
1646
        pBatch->pList = pBatchList;
dengyihao's avatar
dengyihao 已提交
1647

dengyihao's avatar
dengyihao 已提交
1648
        QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
dengyihao's avatar
dengyihao 已提交
1649

dengyihao's avatar
dengyihao 已提交
1650
        taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
1651
      } else {
dengyihao's avatar
dengyihao 已提交
1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672
        if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
          SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
          QUEUE_INIT(&pBatch->wq);
          QUEUE_INIT(&pBatch->listq);

          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->wLen += 1;
          pBatch->batchSize = pMsg->msg.contLen;
          pBatch->pList = *ppBatchList;

          QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
          (*ppBatchList)->len += 1;

          continue;
        }

        queue*     hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
        SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
        if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) {
          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->batchSize += pMsg->msg.contLen;
dengyihao's avatar
dengyihao 已提交
1673
          pBatch->wLen += 1;
dengyihao's avatar
dengyihao 已提交
1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686
        } else {
          SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
          QUEUE_INIT(&pBatch->wq);
          QUEUE_INIT(&pBatch->listq);

          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->wLen += 1;
          pBatch->batchSize += pMsg->msg.contLen;
          pBatch->pList = *ppBatchList;

          QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
          (*ppBatchList)->len += 1;
        }
dengyihao's avatar
dengyihao 已提交
1687
      }
dengyihao's avatar
dengyihao 已提交
1688
      continue;
dengyihao's avatar
dengyihao 已提交
1689
    }
dengyihao's avatar
dengyihao 已提交
1690
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1691
    count++;
dengyihao's avatar
dengyihao 已提交
1692
  }
dengyihao's avatar
dengyihao 已提交
1693

dengyihao's avatar
dengyihao 已提交
1694
  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
dengyihao's avatar
dengyihao 已提交
1695
  while (pIter != NULL) {
dengyihao's avatar
dengyihao 已提交
1696
    SCliBatchList* batchList = (SCliBatchList*)(*pIter);
dengyihao's avatar
dengyihao 已提交
1697 1698 1699
    SCliBatch*     batch = cliGetHeadFromList(batchList);
    if (batch != NULL) {
      cliHandleBatchReq(batch, pThrd);
dengyihao's avatar
dengyihao 已提交
1700
    }
dengyihao's avatar
dengyihao 已提交
1701
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
dengyihao's avatar
dengyihao 已提交
1702 1703
  }

dengyihao's avatar
dengyihao 已提交
1704
  if (count >= 2) {
S
Shengliang Guan 已提交
1705
    tTrace("cli process batch size:%d", count);
dengyihao's avatar
dengyihao 已提交
1706
  }
dengyihao's avatar
dengyihao 已提交
1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719
}

static void cliAsyncCb(uv_async_t* handle) {
  SAsyncItem* item = handle->data;
  SCliThrd*   pThrd = item->pThrd;
  STrans*     pTransInst = pThrd->pTransInst;

  // batch process to avoid to lock/unlock frequently
  queue wq;
  taosThreadMutexLock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  taosThreadMutexUnlock(&item->mtx);

dengyihao's avatar
dengyihao 已提交
1720
  int8_t supportBatch = pTransInst->supportBatch;
dengyihao's avatar
dengyihao 已提交
1721
  if (supportBatch == 0) {
dengyihao's avatar
dengyihao 已提交
1722
    cliNoBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1723
  } else if (supportBatch == 1) {
dengyihao's avatar
dengyihao 已提交
1724
    cliBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1725
  }
dengyihao's avatar
dengyihao 已提交
1726

dengyihao's avatar
dengyihao 已提交
1727
  if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1728
}
dengyihao's avatar
dengyihao 已提交
1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754
static void cliPrepareCb(uv_prepare_t* handle) {
  SCliThrd* thrd = handle->data;
  tTrace("prepare work start");

  SAsyncPool* pool = thrd->asyncPool;
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;

    queue wq;
    taosThreadMutexLock(&item->mtx);
    QUEUE_MOVE(&item->qmsg, &wq);
    taosThreadMutexUnlock(&item->mtx);

    int count = 0;
    while (!QUEUE_IS_EMPTY(&wq)) {
      queue* h = QUEUE_HEAD(&wq);
      QUEUE_REMOVE(h);

      SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
      (*cliAsyncHandle[pMsg->type])(pMsg, thrd);
      count++;
    }
  }
  tTrace("prepare work end");
  if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
dengyihao's avatar
dengyihao 已提交
1755
}
dengyihao's avatar
dengyihao 已提交
1756

dengyihao's avatar
dengyihao 已提交
1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785
void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
  transCtxCleanup(&conn->ctx);
  cliReleaseUnfinishedMsg(conn);
  if (destroy == 1) {
    transQueueDestroy(&conn->cliMsgs);
  } else {
    transQueueClear(&conn->cliMsgs);
  }
}

void cliIteraConnMsgs(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* cmsg = transQueueGet(&conn->cliMsgs, i);
    if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) {
      continue;
    }

    STransMsg resp = {0};
    if (-1 == cliBuildExceptResp(cmsg, &resp)) {
      continue;
    }
    pTransInst->cfp(pTransInst->parent, &resp, NULL);

    cmsg->ctx->ahandle = NULL;
  }
}
dengyihao's avatar
dengyihao 已提交
1786 1787 1788
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
  if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
    uint64_t ahandle = pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
1789
    tDebug("ahandle = %" PRIu64 "", ahandle);
dengyihao's avatar
dengyihao 已提交
1790 1791
    SCliMsg* pMsg = NULL;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
dengyihao's avatar
dengyihao 已提交
1792

dengyihao's avatar
dengyihao 已提交
1793 1794
    transClearBuffer(&conn->readBuf);
    transFreeMsg(transContFromHead((char*)pHead));
dengyihao's avatar
dengyihao 已提交
1795 1796 1797 1798

    for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->cliMsgs); i++) {
      SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i);
      if (cliMsg->type == Release) {
dengyihao's avatar
dengyihao 已提交
1799
        ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req");
dengyihao's avatar
dengyihao 已提交
1800 1801
        return true;
      }
dengyihao's avatar
dengyihao 已提交
1802
    }
dengyihao's avatar
dengyihao 已提交
1803 1804 1805

    cliIteraConnMsgs(conn);

dengyihao's avatar
dengyihao 已提交
1806 1807
    tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1808

dengyihao's avatar
dengyihao 已提交
1809 1810 1811 1812 1813 1814
    addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
    return true;
  }
  return false;
}

U
ubuntu 已提交
1815
static void* cliWorkThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
1816
  SCliThrd* pThrd = (SCliThrd*)arg;
dengyihao's avatar
dengyihao 已提交
1817
  pThrd->pid = taosGetSelfPthreadId();
G
gccgdb1234 已提交
1818
  setThreadName("trans-cli-work");
dengyihao's avatar
dengyihao 已提交
1819
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
dengyihao's avatar
dengyihao 已提交
1820 1821

  tDebug("thread quit-thread:%08" PRId64, pThrd->pid);
1822
  return NULL;
dengyihao's avatar
dengyihao 已提交
1823 1824
}

U
ubuntu 已提交
1825
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
1826
  SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj));
dengyihao's avatar
dengyihao 已提交
1827

dengyihao's avatar
dengyihao 已提交
1828
  STrans* pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
1829
  memcpy(cli->label, label, TSDB_LABEL_LEN);
dengyihao's avatar
dengyihao 已提交
1830
  cli->numOfThreads = numOfThreads;
dengyihao's avatar
dengyihao 已提交
1831
  cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*));
dengyihao's avatar
dengyihao 已提交
1832 1833

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
1834
    SCliThrd* pThrd = createThrdObj(shandle);
dengyihao's avatar
dengyihao 已提交
1835 1836 1837 1838 1839
    if (pThrd == NULL) {
      return NULL;
    }

    int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
1840
    if (err == 0) {
S
Shengliang Guan 已提交
1841
      tDebug("success to create tranport-cli thread:%d", i);
dengyihao's avatar
dengyihao 已提交
1842
    }
dengyihao's avatar
dengyihao 已提交
1843
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
1844
  }
dengyihao's avatar
dengyihao 已提交
1845

dengyihao's avatar
dengyihao 已提交
1846 1847
  return cli;
}
dengyihao's avatar
dengyihao 已提交
1848

dengyihao's avatar
dengyihao 已提交
1849
static FORCE_INLINE void destroyUserdata(STransMsg* userdata) {
dengyihao's avatar
dengyihao 已提交
1850 1851 1852 1853 1854 1855
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
dengyihao's avatar
dengyihao 已提交
1856

dengyihao's avatar
dengyihao 已提交
1857
static FORCE_INLINE void destroyCmsg(void* arg) {
dengyihao's avatar
dengyihao 已提交
1858
  SCliMsg* pMsg = arg;
dengyihao's avatar
dengyihao 已提交
1859 1860 1861
  if (pMsg == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1862

dengyihao's avatar
dengyihao 已提交
1863 1864
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
wafwerar's avatar
wafwerar 已提交
1865
  taosMemoryFree(pMsg);
dengyihao's avatar
dengyihao 已提交
1866
}
dengyihao's avatar
dengyihao 已提交
1867

dengyihao's avatar
dengyihao 已提交
1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
  if (param == NULL) return;

  STaskArg* arg = param;
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;

  tDebug("destroy Ahandle A");
  if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
    tDebug("destroy Ahandle B");
    pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
  }
  tDebug("destroy Ahandle C");

  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
  taosMemoryFree(pMsg);
}

dengyihao's avatar
dengyihao 已提交
1887 1888 1889
static SCliThrd* createThrdObj(void* trans) {
  STrans* pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
1890
  SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd));
U
ubuntu 已提交
1891

dengyihao's avatar
dengyihao 已提交
1892
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
1893
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
1894

wafwerar's avatar
wafwerar 已提交
1895
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
1896 1897 1898 1899 1900 1901 1902 1903
  int err = uv_loop_init(pThrd->loop);
  if (err != 0) {
    tError("failed to init uv_loop, reason:%s", uv_err_name(err));
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1904 1905 1906 1907 1908
  if (pTransInst->supportBatch) {
    pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 4, pThrd, cliAsyncCb);
  } else {
    pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb);
  }
dengyihao's avatar
dengyihao 已提交
1909
  if (pThrd->asyncPool == NULL) {
dengyihao's avatar
ref log  
dengyihao 已提交
1910
    tError("failed to init async pool");
dengyihao's avatar
dengyihao 已提交
1911 1912 1913 1914 1915 1916
    uv_loop_close(pThrd->loop);
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1917 1918 1919 1920

  pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
  uv_prepare_init(pThrd->loop, pThrd->prepare);
  pThrd->prepare->data = pThrd;
dengyihao's avatar
dengyihao 已提交
1921
  // uv_prepare_start(pThrd->prepare, cliPrepareCb);
dengyihao's avatar
dengyihao 已提交
1922

dengyihao's avatar
dengyihao 已提交
1923
  int32_t timerSize = 64;
dengyihao's avatar
dengyihao 已提交
1924 1925 1926 1927 1928 1929 1930
  pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
  for (int i = 0; i < timerSize; i++) {
    uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    uv_timer_init(pThrd->loop, timer);
    taosArrayPush(pThrd->timerList, &timer);
  }

dengyihao's avatar
dengyihao 已提交
1931
  pThrd->pool = createConnPool(4);
dengyihao's avatar
dengyihao 已提交
1932
  transDQCreate(pThrd->loop, &pThrd->delayQueue);
dengyihao's avatar
dengyihao 已提交
1933

dengyihao's avatar
dengyihao 已提交
1934 1935
  transDQCreate(pThrd->loop, &pThrd->timeoutQueue);

dengyihao's avatar
dengyihao 已提交
1936 1937
  transDQCreate(pThrd->loop, &pThrd->waitConnQueue);

dengyihao's avatar
dengyihao 已提交
1938 1939 1940
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
  pThrd->pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
1941
  pThrd->destroyAhandleFp = pTransInst->destroyFp;
dengyihao's avatar
dengyihao 已提交
1942
  pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1943 1944
  pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);

dengyihao's avatar
dengyihao 已提交
1945
  pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1946

dengyihao's avatar
dengyihao 已提交
1947
  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
1948 1949
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
1950
static void destroyThrdObj(SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1951 1952 1953
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1954

wafwerar's avatar
wafwerar 已提交
1955
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
1956
  CLI_RELEASE_UV(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
1957
  taosThreadMutexDestroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
1958
  TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
dengyihao's avatar
dengyihao 已提交
1959
  transAsyncPoolDestroy(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
1960

dengyihao's avatar
dengyihao 已提交
1961
  transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
dengyihao's avatar
dengyihao 已提交
1962
  transDQDestroy(pThrd->timeoutQueue, NULL);
dengyihao's avatar
dengyihao 已提交
1963
  transDQDestroy(pThrd->waitConnQueue, NULL);
dengyihao's avatar
dengyihao 已提交
1964

dengyihao's avatar
dengyihao 已提交
1965
  tDebug("thread destroy %" PRId64, pThrd->pid);
dengyihao's avatar
dengyihao 已提交
1966 1967 1968 1969 1970
  for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
    uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
    taosMemoryFree(timer);
  }
  taosArrayDestroy(pThrd->timerList);
dengyihao's avatar
dengyihao 已提交
1971
  taosMemoryFree(pThrd->prepare);
wafwerar's avatar
wafwerar 已提交
1972
  taosMemoryFree(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
1973
  taosHashCleanup(pThrd->fqdn2ipCache);
dengyihao's avatar
dengyihao 已提交
1974
  taosHashCleanup(pThrd->failFastCache);
dengyihao's avatar
dengyihao 已提交
1975 1976 1977

  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
  while (pIter != NULL) {
dengyihao's avatar
dengyihao 已提交
1978 1979 1980 1981 1982 1983 1984 1985
    SCliBatchList* pBatchList = (SCliBatchList*)(*pIter);
    while (!QUEUE_IS_EMPTY(&pBatchList->wq)) {
      queue* h = QUEUE_HEAD(&pBatchList->wq);
      QUEUE_REMOVE(h);

      SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
      cliDestroyBatch(pBatch);
    }
dengyihao's avatar
dengyihao 已提交
1986 1987
    taosMemoryFree(pBatchList->ip);
    taosMemoryFree(pBatchList->dst);
dengyihao's avatar
dengyihao 已提交
1988
    taosMemoryFree(pBatchList);
dengyihao's avatar
dengyihao 已提交
1989

dengyihao's avatar
dengyihao 已提交
1990 1991
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
  }
dengyihao's avatar
dengyihao 已提交
1992
  taosHashCleanup(pThrd->batchCache);
wafwerar's avatar
wafwerar 已提交
1993
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
1994
}
dengyihao's avatar
dengyihao 已提交
1995

dengyihao's avatar
dengyihao 已提交
1996
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
1997
  //
wafwerar's avatar
wafwerar 已提交
1998
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
1999
}
dengyihao's avatar
dengyihao 已提交
2000

dengyihao's avatar
dengyihao 已提交
2001
void cliSendQuit(SCliThrd* thrd) {
dengyihao's avatar
dengyihao 已提交
2002
  // cli can stop gracefully
wafwerar's avatar
wafwerar 已提交
2003
  SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2004
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
2005
  transAsyncSend(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
2006
  atomic_store_8(&thrd->asyncPool->stop, 1);
dengyihao's avatar
dengyihao 已提交
2007
}
dengyihao's avatar
dengyihao 已提交
2008 2009
void cliWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
dengyihao's avatar
dengyihao 已提交
2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021
    if (uv_handle_get_type(handle) == UV_TIMER) {
      // SCliConn* pConn = handle->data;
      //  if (pConn != NULL && pConn->timer != NULL) {
      //    SCliThrd* pThrd = pConn->hostThrd;
      //    uv_timer_stop((uv_timer_t*)handle);
      //    handle->data = NULL;
      //    taosArrayPush(pThrd->timerList, &pConn->timer);
      //    pConn->timer = NULL;
      //  }
    } else {
      uv_read_stop((uv_stream_t*)handle);
    }
dengyihao's avatar
dengyihao 已提交
2022
    uv_close(handle, cliDestroy);
dengyihao's avatar
dengyihao 已提交
2023 2024
  }
}
dengyihao's avatar
dengyihao 已提交
2025

dengyihao's avatar
dengyihao 已提交
2026
FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
dengyihao's avatar
dengyihao 已提交
2027
  int32_t index = pTransInst->index;
dengyihao's avatar
dengyihao 已提交
2028 2029 2030
  if (pTransInst->numOfThreads == 0) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2031 2032 2033 2034
  /*
   * no lock, and to avoid CPU load imbalance, set limit pTransInst->numOfThreads * 2000;
   */
  if (pTransInst->index++ >= pTransInst->numOfThreads * 2000) {
U
ubuntu 已提交
2035 2036 2037 2038
    pTransInst->index = 0;
  }
  return index % pTransInst->numOfThreads;
}
dengyihao's avatar
dengyihao 已提交
2039
static FORCE_INLINE void doDelayTask(void* param) {
dengyihao's avatar
dengyihao 已提交
2040
  STaskArg* arg = param;
dengyihao's avatar
dengyihao 已提交
2041
  cliHandleReq((SCliMsg*)arg->param1, (SCliThrd*)arg->param2);
dengyihao's avatar
dengyihao 已提交
2042 2043
  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
2044

dengyihao's avatar
dengyihao 已提交
2045 2046 2047
static void doCloseIdleConn(void* param) {
  STaskArg* arg = param;
  SCliConn* conn = arg->param1;
dengyihao's avatar
dengyihao 已提交
2048
  tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
2049
  conn->task = NULL;
dengyihao's avatar
dengyihao 已提交
2050 2051
  cliDestroyConn(conn, true);
  taosMemoryFree(arg);
dengyihao's avatar
dengyihao 已提交
2052 2053
}

dengyihao's avatar
dengyihao 已提交
2054
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
2055
  STrans*        pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2056 2057
  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
2058 2059
  STraceId* trace = &pMsg->msg.info.traceId;
  char      tbuf[256] = {0};
dengyihao's avatar
dengyihao 已提交
2060
  EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
dengyihao's avatar
dengyihao 已提交
2061 2062
  tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
          pCtx->retryStep, pCtx->retryNextInterval);
dengyihao's avatar
dengyihao 已提交
2063 2064 2065 2066

  STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
  arg->param1 = pMsg;
  arg->param2 = pThrd;
dengyihao's avatar
dengyihao 已提交
2067 2068

  transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval);
dengyihao's avatar
dengyihao 已提交
2069
}
dengyihao's avatar
dengyihao 已提交
2070

dengyihao's avatar
dengyihao 已提交
2071
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
dengyihao's avatar
dengyihao 已提交
2072 2073 2074 2075
  if (*val != exp) {
    *val = newVal;
  }
}
dengyihao's avatar
dengyihao 已提交
2076

dengyihao's avatar
dengyihao 已提交
2077
FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
dengyihao's avatar
dengyihao 已提交
2078 2079 2080 2081 2082 2083
  if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
    return false;
  }
  // rebuild resp msg
  SEpSet epset;
  if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) {
dengyihao's avatar
dengyihao 已提交
2084 2085 2086 2087
    return false;
  }
  int32_t tlen = tSerializeSEpSet(NULL, 0, dst);

dengyihao's avatar
dengyihao 已提交
2088 2089 2090 2091 2092 2093 2094
  char*   buf = NULL;
  int32_t len = pResp->contLen - tlen;
  if (len != 0) {
    buf = rpcMallocCont(len);
    memcpy(buf, (char*)pResp->pCont + tlen, len);
  }
  rpcFreeCont(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2095 2096

  pResp->pCont = buf;
dengyihao's avatar
dengyihao 已提交
2097 2098 2099
  pResp->contLen = len;

  *dst = epset;
dengyihao's avatar
dengyihao 已提交
2100 2101
  return true;
}
dengyihao's avatar
dengyihao 已提交
2102
bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
dengyihao's avatar
dengyihao 已提交
2103
  bool noDelay = true;
dengyihao's avatar
dengyihao 已提交
2104 2105 2106 2107 2108 2109 2110
  if (hasEpSet == false) {
    if (pResp->contLen == 0) {
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2111 2112 2113 2114 2115 2116 2117 2118 2119 2120
    } else if (pResp->contLen != 0) {
      SEpSet  epSet;
      int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
      if (valid < 0) {
        tDebug("get invalid epset, epset equal, continue");
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
2121
      } else {
dengyihao's avatar
dengyihao 已提交
2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133
        if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
          tDebug("epset not equal, retry new epset");
          pCtx->epSet = epSet;
          noDelay = false;
        } else {
          if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
            noDelay = false;
          } else {
            tDebug("epset equal, continue");
            EPSET_FORWARD_INUSE(&pCtx->epSet);
          }
        }
dengyihao's avatar
dengyihao 已提交
2134
      }
dengyihao's avatar
dengyihao 已提交
2135 2136
    }
  } else {
dengyihao's avatar
dengyihao 已提交
2137
    SEpSet  epSet;
dengyihao's avatar
dengyihao 已提交
2138 2139
    int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
    if (valid < 0) {
dengyihao's avatar
dengyihao 已提交
2140 2141 2142 2143 2144 2145
      tDebug("get invalid epset, epset equal, continue");
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2146
    } else {
dengyihao's avatar
dengyihao 已提交
2147 2148 2149
      if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
        tDebug("epset not equal, retry new epset");
        pCtx->epSet = epSet;
dengyihao's avatar
dengyihao 已提交
2150
        noDelay = false;
dengyihao's avatar
dengyihao 已提交
2151
      } else {
dengyihao's avatar
dengyihao 已提交
2152 2153 2154 2155 2156 2157
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          tDebug("epset equal, continue");
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
2158
      }
dengyihao's avatar
dengyihao 已提交
2159 2160 2161 2162 2163
    }
  }
  return noDelay;
}
bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
2164 2165
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2166

dengyihao's avatar
dengyihao 已提交
2167 2168
  STransConnCtx* pCtx = pMsg->ctx;
  int32_t        code = pResp->code;
dengyihao's avatar
dengyihao 已提交
2169

dengyihao's avatar
dengyihao 已提交
2170
  bool retry = pTransInst->retry != NULL ? pTransInst->retry(code, pResp->msgType - 1) : false;
dengyihao's avatar
dengyihao 已提交
2171 2172 2173
  if (retry == false) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
2174 2175 2176 2177 2178 2179 2180 2181

  if (!pCtx->retryInit) {
    pCtx->retryMinInterval = pTransInst->retryMinInterval;
    pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
    pCtx->retryStepFactor = pTransInst->retryStepFactor;
    pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet;
    pCtx->retryInitTimestamp = taosGetTimestampMs();
    pCtx->retryNextInterval = pCtx->retryMinInterval;
dengyihao's avatar
dengyihao 已提交
2182
    pCtx->retryStep = 0;
dengyihao's avatar
dengyihao 已提交
2183
    pCtx->retryInit = true;
dengyihao's avatar
dengyihao 已提交
2184
    pCtx->retryCode = TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
2185 2186 2187

    // already retry, not use handle specified by app;
    pMsg->msg.info.handle = 0;
dengyihao's avatar
dengyihao 已提交
2188
  }
dengyihao's avatar
dengyihao 已提交
2189

dengyihao's avatar
dengyihao 已提交
2190 2191
  if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
    return false;
dengyihao's avatar
dengyihao 已提交
2192
  }
dengyihao's avatar
dengyihao 已提交
2193

dengyihao's avatar
dengyihao 已提交
2194 2195 2196 2197 2198 2199
  // code, msgType

  // A:  epset,   leader, not self
  // B:  epset,   not know leader
  // C:  no epset, leader but not serivce

dengyihao's avatar
dengyihao 已提交
2200 2201
  bool noDelay = false;
  if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
dengyihao's avatar
dengyihao 已提交
2202
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2203
    noDelay = cliResetEpset(pCtx, pResp, false);
dengyihao's avatar
dengyihao 已提交
2204 2205
    transFreeMsg(pResp->pCont);
    transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
2206
  } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
2207 2208
             code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED ||
             code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
dengyihao's avatar
dengyihao 已提交
2209
             code == TSDB_CODE_APP_IS_STOPPING || code == TSDB_CODE_VND_STOPPED) {
dengyihao's avatar
dengyihao 已提交
2210
    tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2211
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
2212 2213
    transFreeMsg(pResp->pCont);
    addConnToPool(pThrd->pool, pConn);
dengyihao's avatar
dengyihao 已提交
2214
  } else if (code == TSDB_CODE_SYN_RESTORING) {
dengyihao's avatar
dengyihao 已提交
2215
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2216
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
2217 2218
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2219
  } else {
dengyihao's avatar
dengyihao 已提交
2220
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2221 2222 2223
    noDelay = cliResetEpset(pCtx, pResp, false);
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2224
  }
dengyihao's avatar
dengyihao 已提交
2225 2226 2227 2228
  if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) {
    // save one internal code
    pCtx->retryCode = code;
  }
dengyihao's avatar
dengyihao 已提交
2229

dengyihao's avatar
dengyihao 已提交
2230 2231 2232
  if (noDelay == false) {
    pCtx->epsetRetryCnt = 1;
    pCtx->retryStep++;
dengyihao's avatar
dengyihao 已提交
2233

dengyihao's avatar
dengyihao 已提交
2234 2235 2236 2237 2238 2239
    int64_t factor = pow(pCtx->retryStepFactor, pCtx->retryStep - 1);
    pCtx->retryNextInterval = factor * pCtx->retryMinInterval;
    if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
      pCtx->retryNextInterval = pCtx->retryMaxInterval;
    }

dengyihao's avatar
dengyihao 已提交
2240 2241 2242
    // if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
    //   return false;
    // }
dengyihao's avatar
dengyihao 已提交
2243 2244 2245
  } else {
    pCtx->retryNextInterval = 0;
    pCtx->epsetRetryCnt++;
dengyihao's avatar
dengyihao 已提交
2246
  }
dengyihao's avatar
dengyihao 已提交
2247

dengyihao's avatar
dengyihao 已提交
2248
  pMsg->sent = 0;
dengyihao's avatar
dengyihao 已提交
2249
  cliSchedMsgToNextNode(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
2250
  return true;
dengyihao's avatar
dengyihao 已提交
2251
}
dengyihao's avatar
dengyihao 已提交
2252
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
2253 2254
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2255

dengyihao's avatar
dengyihao 已提交
2256
  if (pMsg == NULL || pMsg->ctx == NULL) {
dengyihao's avatar
dengyihao 已提交
2257
    tTrace("%s conn %p handle resp", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
2258 2259 2260
    pTransInst->cfp(pTransInst->parent, pResp, NULL);
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
2261

dengyihao's avatar
dengyihao 已提交
2262
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
2263

dengyihao's avatar
dengyihao 已提交
2264 2265 2266
  bool retry = cliGenRetryRule(pConn, pResp, pMsg);
  if (retry == true) {
    return -1;
dengyihao's avatar
dengyihao 已提交
2267
  }
dengyihao's avatar
dengyihao 已提交
2268

dengyihao's avatar
dengyihao 已提交
2269 2270 2271
  if (pCtx->retryCode != TSDB_CODE_SUCCESS) {
    int32_t code = pResp->code;
    // return internal code app
dengyihao's avatar
dengyihao 已提交
2272 2273
    if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK ||
        code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
dengyihao's avatar
dengyihao 已提交
2274 2275 2276 2277
      pResp->code = pCtx->retryCode;
    }
  }

2278
  // check whole vnodes is offline on this vgroup
2279 2280
  if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps || pCtx->retryStep > 0) {
    if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
A
Alex Duan 已提交
2281
      pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
2282
    } else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) {
A
Alex Duan 已提交
2283
      pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK;
2284 2285 2286
    }
  }

dengyihao's avatar
dengyihao 已提交
2287 2288
  STraceId* trace = &pResp->info.traceId;
  bool      hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2289
  if (hasEpSet) {
dengyihao's avatar
dengyihao 已提交
2290 2291
    char tbuf[256] = {0};
    EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
dengyihao's avatar
dengyihao 已提交
2292
    tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
2293
  }
dengyihao's avatar
dengyihao 已提交
2294

dengyihao's avatar
dengyihao 已提交
2295
  if (pCtx->pSem != NULL) {
dengyihao's avatar
dengyihao 已提交
2296
    tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
2297
    if (pCtx->pRsp == NULL) {
dengyihao's avatar
dengyihao 已提交
2298
      tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
2299 2300 2301
    } else {
      memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
    }
dengyihao's avatar
dengyihao 已提交
2302
    tsem_post(pCtx->pSem);
2303
    pCtx->pRsp = NULL;
dengyihao's avatar
dengyihao 已提交
2304
  } else {
dengyihao's avatar
dengyihao 已提交
2305
    tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
2306
    if (retry == false && hasEpSet == true) {
dengyihao's avatar
dengyihao 已提交
2307
      pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2308
    } else {
dengyihao's avatar
dengyihao 已提交
2309
      if (!cliIsEpsetUpdated(pResp->code, pCtx)) {
dengyihao's avatar
dengyihao 已提交
2310 2311 2312 2313
        pTransInst->cfp(pTransInst->parent, pResp, NULL);
      } else {
        pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2314
    }
dengyihao's avatar
dengyihao 已提交
2315
  }
dengyihao's avatar
dengyihao 已提交
2316
  return 0;
dengyihao's avatar
dengyihao 已提交
2317
}
U
ubuntu 已提交
2318 2319

void transCloseClient(void* arg) {
U
ubuntu 已提交
2320
  SCliObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
2321
  for (int i = 0; i < cli->numOfThreads; i++) {
U
ubuntu 已提交
2322
    cliSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2323
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2324
  }
wafwerar's avatar
wafwerar 已提交
2325 2326
  taosMemoryFree(cli->pThreadObj);
  taosMemoryFree(cli);
dengyihao's avatar
dengyihao 已提交
2327
}
dengyihao's avatar
dengyihao 已提交
2328 2329 2330 2331 2332
void transRefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2333
  tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2334 2335 2336 2337 2338 2339 2340
  UNUSED(ref);
}
void transUnrefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2341
  tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2342
  if (ref == 0) {
U
ubuntu 已提交
2343
    cliDestroyConn((SCliConn*)handle, true);
dengyihao's avatar
dengyihao 已提交
2344 2345
  }
}
dengyihao's avatar
dengyihao 已提交
2346
static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2347
  SCliThrd*  pThrd = NULL;
dengyihao's avatar
dengyihao 已提交
2348
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2349 2350 2351
  if (exh == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
2352

dengyihao's avatar
dengyihao 已提交
2353 2354 2355 2356 2357 2358
  if (exh->pThrd == NULL && trans != NULL) {
    int idx = cliRBChoseIdx(trans);
    if (idx < 0) return NULL;
    exh->pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }

dengyihao's avatar
dengyihao 已提交
2359
  pThrd = exh->pThrd;
dengyihao's avatar
dengyihao 已提交
2360
  transReleaseExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2361 2362
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
2363
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2364
  if (handle == 0) {
dengyihao's avatar
dengyihao 已提交
2365
    int idx = cliRBChoseIdx(trans);
dengyihao's avatar
dengyihao 已提交
2366
    if (idx < 0) return NULL;
dengyihao's avatar
dengyihao 已提交
2367 2368
    return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }
dengyihao's avatar
dengyihao 已提交
2369
  SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle);
D
dapan1121 已提交
2370
  return pThrd;
dengyihao's avatar
dengyihao 已提交
2371
}
dengyihao's avatar
dengyihao 已提交
2372
int transReleaseCliHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
2373 2374 2375
  int  idx = -1;
  bool valid = false;

dengyihao's avatar
dengyihao 已提交
2376
  SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
dengyihao's avatar
dengyihao 已提交
2377
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2378
    return -1;
dengyihao's avatar
dengyihao 已提交
2379
  }
dengyihao's avatar
dengyihao 已提交
2380

dengyihao's avatar
dengyihao 已提交
2381
  STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527};
dengyihao's avatar
rm code  
dengyihao 已提交
2382
  TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
dengyihao's avatar
dengyihao 已提交
2383

dengyihao's avatar
dengyihao 已提交
2384 2385 2386
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
  pCtx->ahandle = tmsg.info.ahandle;

dengyihao's avatar
dengyihao 已提交
2387
  SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2388
  cmsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
2389
  cmsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2390
  cmsg->type = Release;
dengyihao's avatar
dengyihao 已提交
2391
  cmsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2392

dengyihao's avatar
dengyihao 已提交
2393 2394 2395
  STraceId* trace = &tmsg.info.traceId;
  tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);

dengyihao's avatar
dengyihao 已提交
2396
  if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
dengyihao's avatar
dengyihao 已提交
2397
    destroyCmsg(cmsg);
dengyihao's avatar
dengyihao 已提交
2398 2399
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2400
  return 0;
dengyihao's avatar
dengyihao 已提交
2401
}
dengyihao's avatar
dengyihao 已提交
2402

dengyihao's avatar
dengyihao 已提交
2403
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
2404 2405 2406 2407 2408
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2409

dengyihao's avatar
dengyihao 已提交
2410
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
dengyihao's avatar
dengyihao 已提交
2411
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2412
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2413
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2414
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2415 2416
  }

dengyihao's avatar
dengyihao 已提交
2417
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
wafwerar's avatar
wafwerar 已提交
2418
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2419
  pCtx->epSet = *pEpSet;
dengyihao's avatar
dengyihao 已提交
2420
  pCtx->origEpSet = *pEpSet;
S
Shengliang Guan 已提交
2421
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2422
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2423

H
Haojun Liao 已提交
2424
  if (ctx != NULL) pCtx->appCtx = *ctx;
dengyihao's avatar
dengyihao 已提交
2425

wafwerar's avatar
wafwerar 已提交
2426
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2427
  cliMsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2428
  cliMsg->msg = *pReq;
dengyihao's avatar
dengyihao 已提交
2429
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2430
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2431
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2432

dengyihao's avatar
dengyihao 已提交
2433
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2434 2435
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2436 2437
  if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2438
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2439 2440
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2441
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2442
  return 0;
dengyihao's avatar
dengyihao 已提交
2443
}
dengyihao's avatar
dengyihao 已提交
2444

dengyihao's avatar
dengyihao 已提交
2445
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
dengyihao's avatar
dengyihao 已提交
2446 2447 2448 2449 2450
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2451

dengyihao's avatar
dengyihao 已提交
2452 2453
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2454
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2455
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2456
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2457
  }
dengyihao's avatar
dengyihao 已提交
2458

dengyihao's avatar
dengyihao 已提交
2459 2460
  tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
  tsem_init(sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
2461

dengyihao's avatar
dengyihao 已提交
2462 2463
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());

wafwerar's avatar
wafwerar 已提交
2464
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2465
  pCtx->epSet = *pEpSet;
dengyihao's avatar
dengyihao 已提交
2466
  pCtx->origEpSet = *pEpSet;
S
Shengliang Guan 已提交
2467
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2468
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2469
  pCtx->pSem = sem;
dengyihao's avatar
dengyihao 已提交
2470 2471
  pCtx->pRsp = pRsp;

wafwerar's avatar
wafwerar 已提交
2472
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2473 2474 2475
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2476
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2477
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2478

dengyihao's avatar
dengyihao 已提交
2479
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2480 2481
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2482

dengyihao's avatar
dengyihao 已提交
2483 2484
  int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
  if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
2485
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2486
    goto _RETURN;
dengyihao's avatar
dengyihao 已提交
2487
  }
dengyihao's avatar
dengyihao 已提交
2488
  tsem_wait(sem);
dengyihao's avatar
dengyihao 已提交
2489 2490

_RETURN:
dengyihao's avatar
dengyihao 已提交
2491 2492
  tsem_destroy(sem);
  taosMemoryFree(sem);
dengyihao's avatar
dengyihao 已提交
2493
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2494
  return ret;
dengyihao's avatar
dengyihao 已提交
2495
}
dengyihao's avatar
dengyihao 已提交
2496 2497 2498
/*
 *
 **/
dengyihao's avatar
dengyihao 已提交
2499
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
dengyihao's avatar
dengyihao 已提交
2500
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2501 2502 2503
  if (pTransInst == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2504 2505 2506

  SCvtAddr cvtAddr = {0};
  if (ip != NULL && fqdn != NULL) {
dengyihao's avatar
dengyihao 已提交
2507 2508
    tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
    tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn));
dengyihao's avatar
dengyihao 已提交
2509 2510
    cvtAddr.cvt = true;
  }
dengyihao's avatar
dengyihao 已提交
2511 2512
  for (int i = 0; i < pTransInst->numOfThreads; i++) {
    STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2513
    pCtx->cvtAddr = cvtAddr;
dengyihao's avatar
dengyihao 已提交
2514 2515 2516 2517

    SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
    cliMsg->ctx = pCtx;
    cliMsg->type = Update;
dengyihao's avatar
dengyihao 已提交
2518
    cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2519

dengyihao's avatar
dengyihao 已提交
2520
    SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
S
Shengliang Guan 已提交
2521
    tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid);
dengyihao's avatar
dengyihao 已提交
2522

dengyihao's avatar
dengyihao 已提交
2523 2524
    if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) {
      destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2525
      transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2526 2527
      return -1;
    }
dengyihao's avatar
dengyihao 已提交
2528
  }
dengyihao's avatar
dengyihao 已提交
2529
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2530
  return 0;
dengyihao's avatar
dengyihao 已提交
2531
}
dengyihao's avatar
dengyihao 已提交
2532 2533 2534 2535 2536

int64_t transAllocHandle() {
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
  tDebug("pre alloc refId %" PRId64 "", exh->refId);
dengyihao's avatar
dengyihao 已提交
2537

dengyihao's avatar
dengyihao 已提交
2538 2539
  return exh->refId;
}