transCli.c 80.0 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
dengyihao's avatar
dengyihao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "transComm.h"

dengyihao's avatar
dengyihao 已提交
17 18 19 20 21
typedef struct {
  int32_t numOfConn;
  queue   msgQ;
} SMsgList;

dengyihao's avatar
dengyihao 已提交
22
typedef struct SConnList {
dengyihao's avatar
dengyihao 已提交
23 24 25 26
  queue     conns;
  int32_t   size;
  SMsgList* list;
  void*     pThrd;
dengyihao's avatar
dengyihao 已提交
27 28
} SConnList;

dengyihao's avatar
dengyihao 已提交
29
typedef struct {
dengyihao's avatar
dengyihao 已提交
30 31 32 33 34 35
  queue   wq;
  int32_t len;

  int connMax;
  int connCnt;
  int batchLenLimit;
dengyihao's avatar
dengyihao 已提交
36
  int sending;
dengyihao's avatar
dengyihao 已提交
37

dengyihao's avatar
dengyihao 已提交
38 39 40
  char*    dst;
  char*    ip;
  uint16_t port;
dengyihao's avatar
dengyihao 已提交
41 42 43 44 45 46 47 48 49 50

} SCliBatchList;

typedef struct {
  queue          wq;
  queue          listq;
  int32_t        wLen;
  int32_t        batchSize;  //
  int32_t        batch;
  SCliBatchList* pList;
dengyihao's avatar
dengyihao 已提交
51
} SCliBatch;
dengyihao's avatar
dengyihao 已提交
52

dengyihao's avatar
dengyihao 已提交
53
typedef struct SCliConn {
dengyihao's avatar
dengyihao 已提交
54
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
55 56
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
57
  queue        wreqQueue;
dengyihao's avatar
dengyihao 已提交
58 59

  uv_timer_t* timer;  // read timer, forbidden
dengyihao's avatar
dengyihao 已提交
60

dengyihao's avatar
dengyihao 已提交
61 62 63 64
  void* hostThrd;

  SConnBuffer readBuf;
  STransQueue cliMsgs;
dengyihao's avatar
dengyihao 已提交
65 66 67

  queue      q;
  SConnList* list;
dengyihao's avatar
dengyihao 已提交
68 69

  STransCtx  ctx;
dengyihao's avatar
dengyihao 已提交
70 71
  bool       broken;  // link broken or not
  ConnStatus status;  //
dengyihao's avatar
dengyihao 已提交
72

dengyihao's avatar
dengyihao 已提交
73 74
  SCliBatch* pBatch;

dengyihao's avatar
dengyihao 已提交
75 76
  int64_t refId;
  char*   ip;
dengyihao's avatar
dengyihao 已提交
77

dengyihao's avatar
dengyihao 已提交
78
  SDelayTask* task;
dengyihao's avatar
dengyihao 已提交
79

dengyihao's avatar
dengyihao 已提交
80
  // debug and log info
dengyihao's avatar
dengyihao 已提交
81 82 83
  char src[32];
  char dst[32];

dengyihao's avatar
dengyihao 已提交
84
} SCliConn;
dengyihao's avatar
dengyihao 已提交
85

dengyihao's avatar
dengyihao 已提交
86
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
87
  STransConnCtx* ctx;
dengyihao's avatar
formate  
dengyihao 已提交
88
  STransMsg      msg;
dengyihao's avatar
dengyihao 已提交
89
  queue          q;
dengyihao's avatar
dengyihao 已提交
90
  STransMsgType  type;
dengyihao's avatar
dengyihao 已提交
91

dengyihao's avatar
dengyihao 已提交
92
  int64_t  refId;
dengyihao's avatar
dengyihao 已提交
93 94
  uint64_t st;
  int      sent;  //(0: no send, 1: alread sent)
dengyihao's avatar
dengyihao 已提交
95 96
} SCliMsg;

dengyihao's avatar
dengyihao 已提交
97
typedef struct SCliThrd {
dengyihao's avatar
dengyihao 已提交
98 99 100 101 102 103
  TdThread      thread;  // tid
  int64_t       pid;     // pid
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  uv_prepare_t* prepare;
  void*         pool;  // conn pool
dengyihao's avatar
dengyihao 已提交
104
  // timer handles
dengyihao's avatar
dengyihao 已提交
105
  SArray* timerList;
dengyihao's avatar
dengyihao 已提交
106
  // msg queue
dengyihao's avatar
dengyihao 已提交
107
  queue         msg;
wafwerar's avatar
wafwerar 已提交
108
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
109
  SDelayQueue*  delayQueue;
dengyihao's avatar
dengyihao 已提交
110
  SDelayQueue*  timeoutQueue;
dengyihao's avatar
dengyihao 已提交
111
  SDelayQueue*  waitConnQueue;
dengyihao's avatar
dengyihao 已提交
112 113
  uint64_t      nextTimeout;  // next timeout
  void*         pTransInst;   //
dengyihao's avatar
dengyihao 已提交
114

dengyihao's avatar
dengyihao 已提交
115
  int connCount;
dengyihao's avatar
dengyihao 已提交
116
  void (*destroyAhandleFp)(void* ahandle);
dengyihao's avatar
dengyihao 已提交
117 118
  SHashObj* fqdn2ipCache;
  SCvtAddr  cvtAddr;
dengyihao's avatar
dengyihao 已提交
119

dengyihao's avatar
dengyihao 已提交
120
  SHashObj* failFastCache;
dengyihao's avatar
dengyihao 已提交
121
  SHashObj* batchCache;
dengyihao's avatar
dengyihao 已提交
122

dengyihao's avatar
dengyihao 已提交
123 124
  SCliMsg* stopMsg;

dengyihao's avatar
dengyihao 已提交
125
  bool quit;
dengyihao's avatar
dengyihao 已提交
126
} SCliThrd;
dengyihao's avatar
dengyihao 已提交
127

U
ubuntu 已提交
128
typedef struct SCliObj {
dengyihao's avatar
dengyihao 已提交
129 130 131 132
  char       label[TSDB_LABEL_LEN];
  int32_t    index;
  int        numOfThreads;
  SCliThrd** pThreadObj;
U
ubuntu 已提交
133
} SCliObj;
dengyihao's avatar
dengyihao 已提交
134

dengyihao's avatar
dengyihao 已提交
135 136 137 138 139 140 141
typedef struct {
  int32_t reinit;
  int64_t timestamp;
  int32_t count;
  int32_t threshold;
  int64_t interval;
} SFailFastItem;
dengyihao's avatar
dengyihao 已提交
142
// conn pool
dengyihao's avatar
dengyihao 已提交
143
// add expire timeout and capacity limit
dengyihao's avatar
dengyihao 已提交
144
static void*     createConnPool(int size);
145
static void*     destroyConnPool(void* pool);
dengyihao's avatar
dengyihao 已提交
146
static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed);
dengyihao's avatar
dengyihao 已提交
147
static void      addConnToPool(void* pool, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
148
static void      doCloseIdleConn(void* param);
dengyihao's avatar
dengyihao 已提交
149

dengyihao's avatar
dengyihao 已提交
150 151
// register conn timer
static void cliConnTimeout(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
152 153
// register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
154
// register timer in each thread to clear expire conn
dengyihao's avatar
dengyihao 已提交
155
// static void cliTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
156
// alloc buffer for recv
dengyihao's avatar
dengyihao 已提交
157
static FORCE_INLINE void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
158
// callback after recv nbytes from socket
U
ubuntu 已提交
159
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
160
// callback after send data to socket
U
ubuntu 已提交
161
static void cliSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
162
// callback after conn to server
U
ubuntu 已提交
163 164
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
165
static void cliIdleCb(uv_idle_t* handle);
dengyihao's avatar
dengyihao 已提交
166
static void cliPrepareCb(uv_prepare_t* handle);
dengyihao's avatar
dengyihao 已提交
167

dengyihao's avatar
dengyihao 已提交
168
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
169
static void cliSendBatchCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
170 171

SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
dengyihao's avatar
dengyihao 已提交
172

dengyihao's avatar
dengyihao 已提交
173 174
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);

dengyihao's avatar
dengyihao 已提交
175
static int32_t allocConnRef(SCliConn* conn, bool update);
dengyihao's avatar
dengyihao 已提交
176

dengyihao's avatar
dengyihao 已提交
177
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
dengyihao's avatar
dengyihao 已提交
178

dengyihao's avatar
dengyihao 已提交
179
static SCliConn* cliCreateConn(SCliThrd* thrd);
U
ubuntu 已提交
180 181
static void      cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void      cliDestroy(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
182
static void      cliSend(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
183
static void      cliSendBatch(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
184
static void      cliDestroyConnMsgs(SCliConn* conn, bool destroy);
dengyihao's avatar
dengyihao 已提交
185

dengyihao's avatar
dengyihao 已提交
186 187
static void    doFreeTimeoutMsg(void* param);
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg);
dengyihao's avatar
dengyihao 已提交
188

dengyihao's avatar
dengyihao 已提交
189
// cli util func
dengyihao's avatar
dengyihao 已提交
190 191
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
dengyihao's avatar
dengyihao 已提交
192

dengyihao's avatar
dengyihao 已提交
193
static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
dengyihao's avatar
dengyihao 已提交
194

dengyihao's avatar
dengyihao 已提交
195 196 197
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn);
static FORCE_INLINE void     cliUpdateFqdnCache(SHashObj* cache, char* fqdn);

dengyihao's avatar
dengyihao 已提交
198
// process data read from server, add decompress etc later
U
ubuntu 已提交
199
static void cliHandleResp(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
200
// handle except about conn
U
ubuntu 已提交
201
static void cliHandleExcept(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
202
static void cliReleaseUnfinishedMsg(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
203
static void cliHandleFastFail(SCliConn* pConn, int status);
dengyihao's avatar
dengyihao 已提交
204

dengyihao's avatar
dengyihao 已提交
205
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd);
206
// handle req from app
dengyihao's avatar
dengyihao 已提交
207 208 209 210 211 212
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
                                                                   cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
213 214
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
/// NULL,cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
215

dengyihao's avatar
dengyihao 已提交
216 217
static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
static FORCE_INLINE void destroyCmsg(void* cmsg);
dengyihao's avatar
dengyihao 已提交
218
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
dengyihao's avatar
dengyihao 已提交
219
static FORCE_INLINE int  cliRBChoseIdx(STrans* pTransInst);
dengyihao's avatar
dengyihao 已提交
220
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
221

dengyihao's avatar
dengyihao 已提交
222
// thread obj
dengyihao's avatar
dengyihao 已提交
223
static SCliThrd* createThrdObj(void* trans);
dengyihao's avatar
dengyihao 已提交
224
static void      destroyThrdObj(SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
225

dengyihao's avatar
dengyihao 已提交
226 227 228 229 230 231 232 233 234
static void cliWalkCb(uv_handle_t* handle, void* arg);

#define CLI_RELEASE_UV(loop)        \
  do {                              \
    uv_walk(loop, cliWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);   \
    uv_loop_close(loop);            \
  } while (0);

dengyihao's avatar
dengyihao 已提交
235 236 237 238 239 240
// snprintf may cause performance problem
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port)          \
  do {                                                  \
    snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
  } while (0)

dengyihao's avatar
dengyihao 已提交
241 242
#define CONN_PERSIST_TIME(para)   ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
dengyihao's avatar
dengyihao 已提交
243

dengyihao's avatar
dengyihao 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle)                         \
  do {                                                                    \
    int i = 0, sz = transQueueSize(&conn->cliMsgs);                       \
    for (; i < sz; i++) {                                                 \
      pMsg = transQueueGet(&conn->cliMsgs, i);                            \
      if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
        break;                                                            \
      }                                                                   \
    }                                                                     \
    if (i == sz) {                                                        \
      pMsg = NULL;                                                        \
      tDebug("msg not found, %" PRIu64 "", ahandle);                      \
    } else {                                                              \
      pMsg = transQueueRm(&conn->cliMsgs, i);                             \
      tDebug("msg found, %" PRIu64 "", ahandle);                          \
    }                                                                     \
dengyihao's avatar
dengyihao 已提交
260
  } while (0)
dengyihao's avatar
dengyihao 已提交
261

U
ubuntu 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274
#define CONN_GET_NEXT_SENDMSG(conn)                 \
  do {                                              \
    int i = 0;                                      \
    do {                                            \
      pCliMsg = transQueueGet(&conn->cliMsgs, i++); \
      if (pCliMsg && 0 == pCliMsg->sent) {          \
        break;                                      \
      }                                             \
    } while (pCliMsg != NULL);                      \
    if (pCliMsg == NULL) {                          \
      goto _RETURN;                                 \
    }                                               \
  } while (0)
dengyihao's avatar
dengyihao 已提交
275

dengyihao's avatar
formate  
dengyihao 已提交
276 277
#define CONN_SET_PERSIST_BY_APP(conn) \
  do {                                \
dengyihao's avatar
dengyihao 已提交
278 279
    if (conn->status == ConnNormal) { \
      conn->status = ConnAcquire;     \
dengyihao's avatar
formate  
dengyihao 已提交
280 281 282
      transRefCliHandle(conn);        \
    }                                 \
  } while (0)
283

dengyihao's avatar
dengyihao 已提交
284 285 286 287
#define CONN_NO_PERSIST_BY_APP(conn) \
  (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) \
  (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
288

S
Shengliang Guan 已提交
289 290
#define REQUEST_NO_RESP(msg)         ((msg)->info.noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg)   ((msg)->info.persistHandle == 1)
dengyihao's avatar
dengyihao 已提交
291
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
dengyihao's avatar
dengyihao 已提交
292

dengyihao's avatar
dengyihao 已提交
293
#define EPSET_IS_VALID(epSet)       ((epSet) != NULL && (epSet)->numOfEps >= 0 && (epSet)->inUse >= 0)
dengyihao's avatar
dengyihao 已提交
294
#define EPSET_GET_SIZE(epSet)       (epSet)->numOfEps
dengyihao's avatar
dengyihao 已提交
295
#define EPSET_GET_INUSE_IP(epSet)   ((epSet)->eps[(epSet)->inUse].fqdn)
dengyihao's avatar
dengyihao 已提交
296
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
dengyihao's avatar
dengyihao 已提交
297 298 299 300 301 302
#define EPSET_FORWARD_INUSE(epSet)                             \
  do {                                                         \
    if ((epSet)->numOfEps != 0) {                              \
      ++((epSet)->inUse);                                      \
      (epSet)->inUse = ((epSet)->inUse) % ((epSet)->numOfEps); \
    }                                                          \
dengyihao's avatar
dengyihao 已提交
303
  } while (0)
dengyihao's avatar
dengyihao 已提交
304

dengyihao's avatar
dengyihao 已提交
305 306 307 308 309 310 311 312 313 314 315
#define EPSET_DEBUG_STR(epSet, tbuf)                                                                                   \
  do {                                                                                                                 \
    int len = snprintf(tbuf, sizeof(tbuf), "epset:{");                                                                 \
    for (int i = 0; i < (epSet)->numOfEps; i++) {                                                                      \
      if (i == (epSet)->numOfEps - 1) {                                                                                \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port);   \
      } else {                                                                                                         \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
      }                                                                                                                \
    }                                                                                                                  \
    len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse);                                    \
dengyihao's avatar
dengyihao 已提交
316
  } while (0);
dengyihao's avatar
dengyihao 已提交
317

U
ubuntu 已提交
318
static void* cliWorkThread(void* arg);
dengyihao's avatar
dengyihao 已提交
319

dengyihao's avatar
dengyihao 已提交
320 321 322 323 324 325 326 327
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
    if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
      if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
        conn->ctx.freeFunc(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
328
      } else if (msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) {
dengyihao's avatar
dengyihao 已提交
329
        tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
330
        pThrd->destroyAhandleFp(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
331 332 333 334
      }
    }
    destroyCmsg(msg);
  }
dengyihao's avatar
dengyihao 已提交
335
  transQueueClear(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
336
  memset(&conn->ctx, 0, sizeof(conn->ctx));
dengyihao's avatar
dengyihao 已提交
337
}
dengyihao's avatar
dengyihao 已提交
338
bool cliMaySendCachedMsg(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
339
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
340
    SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
341
    CONN_GET_NEXT_SENDMSG(conn);
dengyihao's avatar
dengyihao 已提交
342 343
    cliSend(conn);
    return true;
dengyihao's avatar
dengyihao 已提交
344
  }
dengyihao's avatar
dengyihao 已提交
345
  return false;
U
ubuntu 已提交
346 347
_RETURN:
  return false;
dengyihao's avatar
dengyihao 已提交
348
}
dengyihao's avatar
formate  
dengyihao 已提交
349
void cliHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
350 351
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
352

dengyihao's avatar
dengyihao 已提交
353 354 355 356 357 358
  if (conn->timer) {
    if (uv_is_active((uv_handle_t*)conn->timer)) {
      tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
      uv_timer_stop(conn->timer);
    }
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
359
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
360
    conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
361 362
  }

dengyihao's avatar
opt rpc  
dengyihao 已提交
363
  STransMsgHead* pHead = NULL;
dengyihao's avatar
dengyihao 已提交
364 365 366

  int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
  if (msgLen <= 0) {
dengyihao's avatar
dengyihao 已提交
367 368 369
    tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
370 371 372 373

  if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
    tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
  }
dengyihao's avatar
dengyihao 已提交
374 375
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
376 377 378 379
  if (cliRecvReleaseReq(conn, pHead)) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
380 381 382 383 384
  STransMsg transMsg = {0};
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = transContFromHead((char*)pHead);
  transMsg.code = pHead->code;
  transMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
385
  transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
386
  transMsg.info.traceId = pHead->traceId;
dengyihao's avatar
dengyihao 已提交
387
  transMsg.info.hasEpSet = pHead->hasEpSet;
dengyihao's avatar
dengyihao 已提交
388

dengyihao's avatar
dengyihao 已提交
389 390 391 392
  SCliMsg*       pMsg = NULL;
  STransConnCtx* pCtx = NULL;
  if (CONN_NO_PERSIST_BY_APP(conn)) {
    pMsg = transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
393 394 395

    pCtx = pMsg ? pMsg->ctx : NULL;
    transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
396
    tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
U
ubuntu 已提交
397
  } else {
dengyihao's avatar
dengyihao 已提交
398 399 400
    uint64_t ahandle = (uint64_t)pHead->ahandle;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
    if (pMsg == NULL) {
S
Shengliang Guan 已提交
401
      transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
402 403
      tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
             transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
404
      if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
D
dapan1121 已提交
405
        transMsg.code = TSDB_CODE_RPC_BROKEN_LINK;
S
Shengliang Guan 已提交
406
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
dengyihao's avatar
dengyihao 已提交
407 408
        tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
409 410
      }
    } else {
dengyihao's avatar
dengyihao 已提交
411
      pCtx = pMsg->ctx;
S
Shengliang Guan 已提交
412
      transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
413
      tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
414
    }
U
ubuntu 已提交
415
  }
dengyihao's avatar
dengyihao 已提交
416
  // buf's mem alread translated to transMsg.pCont
dengyihao's avatar
dengyihao 已提交
417
  if (!CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
418
    transMsg.info.handle = (void*)conn->refId;
dengyihao's avatar
dengyihao 已提交
419
    tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
420
  }
dengyihao's avatar
dengyihao 已提交
421

dengyihao's avatar
dengyihao 已提交
422
  STraceId* trace = &transMsg.info.traceId;
dengyihao's avatar
dengyihao 已提交
423
  tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
dengyihao's avatar
dengyihao 已提交
424
          TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
dengyihao's avatar
dengyihao 已提交
425

dengyihao's avatar
dengyihao 已提交
426
  if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
427
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
428
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
429 430
    return;
  }
S
Shengliang Guan 已提交
431
  if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
432
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
433
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
434 435
    return;
  }
dengyihao's avatar
dengyihao 已提交
436

dengyihao's avatar
dengyihao 已提交
437
  if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
438 439 440
    if (cliAppCb(conn, &transMsg, pMsg) != 0) {
      return;
    }
dengyihao's avatar
dengyihao 已提交
441
  }
dengyihao's avatar
dengyihao 已提交
442 443
  destroyCmsg(pMsg);

dengyihao's avatar
dengyihao 已提交
444
  if (cliMaySendCachedMsg(conn) == true) {
dengyihao's avatar
dengyihao 已提交
445 446
    return;
  }
dengyihao's avatar
dengyihao 已提交
447

U
ubuntu 已提交
448
  if (CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
449
    return addConnToPool(pThrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
450
  }
dengyihao's avatar
test  
dengyihao 已提交
451

dengyihao's avatar
dengyihao 已提交
452
  uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
453
}
U
ubuntu 已提交
454

dengyihao's avatar
dengyihao 已提交
455
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
dengyihao's avatar
dengyihao 已提交
456
  if (transQueueEmpty(&pConn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
457
    if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
dengyihao's avatar
dengyihao 已提交
458
      tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
formate  
dengyihao 已提交
459 460 461
      transUnrefCliHandle(pConn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
462
  }
dengyihao's avatar
dengyihao 已提交
463 464 465
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
  bool      once = false;
D
dapan1121 已提交
466
  do {
dengyihao's avatar
dengyihao 已提交
467
    SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
dengyihao's avatar
enh log  
dengyihao 已提交
468

D
dapan1121 已提交
469 470
    if (pMsg == NULL && once) {
      break;
dengyihao's avatar
dengyihao 已提交
471
    }
dengyihao's avatar
enh log  
dengyihao 已提交
472 473 474 475 476 477

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg)) {
      destroyCmsg(pMsg);
      break;
    }

dengyihao's avatar
dengyihao 已提交
478
    STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
U
ubuntu 已提交
479

dengyihao's avatar
dengyihao 已提交
480
    STransMsg transMsg = {0};
dengyihao's avatar
dengyihao 已提交
481
    transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
dengyihao's avatar
dengyihao 已提交
482
    transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
S
Shengliang Guan 已提交
483
    transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
484

dengyihao's avatar
dengyihao 已提交
485
    if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
S
Shengliang Guan 已提交
486
      transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
487
      tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
dengyihao's avatar
dengyihao 已提交
488
             TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
489
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
490 491 492
        int32_t msgType = 0;
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType);
        transMsg.msgType = msgType;
dengyihao's avatar
dengyihao 已提交
493
        tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
S
Shengliang Guan 已提交
494
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
495
      }
dengyihao's avatar
dengyihao 已提交
496
    } else {
dengyihao's avatar
dengyihao 已提交
497
      transMsg.info.ahandle = (pMsg != NULL && pMsg->type != Release && pCtx) ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
498 499 500
    }

    if (pCtx == NULL || pCtx->pSem == NULL) {
S
Shengliang Guan 已提交
501
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
502 503 504 505 506
        if (pMsg == NULL || REQUEST_NO_RESP(&pMsg->msg) || pMsg->type == Release) {
          destroyCmsg(pMsg);
          once = true;
          continue;
        }
U
ubuntu 已提交
507
      }
dengyihao's avatar
dengyihao 已提交
508
    }
dengyihao's avatar
enh log  
dengyihao 已提交
509

dengyihao's avatar
dengyihao 已提交
510
    if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
511 512 513
      if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
        return;
      }
dengyihao's avatar
dengyihao 已提交
514 515
    }
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
516
    tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn));
D
dapan1121 已提交
517
  } while (!transQueueEmpty(&pConn->cliMsgs));
dengyihao's avatar
dengyihao 已提交
518
  transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
519
}
dengyihao's avatar
dengyihao 已提交
520
void cliHandleExcept(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
521
  tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
522 523
  cliHandleExceptImpl(conn, -1);
}
dengyihao's avatar
dengyihao 已提交
524

dengyihao's avatar
dengyihao 已提交
525 526
void cliConnTimeout(uv_timer_t* handle) {
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
527 528
  SCliThrd* pThrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
529
  tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
530

dengyihao's avatar
dengyihao 已提交
531
  uv_timer_stop(handle);
dengyihao's avatar
dengyihao 已提交
532 533 534
  handle->data = NULL;
  taosArrayPush(pThrd->timerList, &conn->timer);
  conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
535 536

  cliHandleFastFail(conn, UV_ECANCELED);
dengyihao's avatar
dengyihao 已提交
537
}
dengyihao's avatar
dengyihao 已提交
538 539 540 541
void cliReadTimeoutCb(uv_timer_t* handle) {
  // set up timeout cb
  SCliConn* conn = handle->data;
  tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
542
  uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
543 544
  cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
}
U
ubuntu 已提交
545 546

void* createConnPool(int size) {
547 548
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
549
}
U
ubuntu 已提交
550
void* destroyConnPool(void* pool) {
dengyihao's avatar
dengyihao 已提交
551
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
552
  SCliThrd*  pThrd = connList->pThrd;
dengyihao's avatar
dengyihao 已提交
553
  while (connList != NULL) {
dengyihao's avatar
dengyihao 已提交
554 555
    while (!QUEUE_IS_EMPTY(&connList->conns)) {
      queue*    h = QUEUE_HEAD(&connList->conns);
dengyihao's avatar
dengyihao 已提交
556
      SCliConn* c = QUEUE_DATA(h, SCliConn, q);
U
ubuntu 已提交
557
      cliDestroyConn(c, true);
dengyihao's avatar
dengyihao 已提交
558
    }
dengyihao's avatar
dengyihao 已提交
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573

    SMsgList* msglist = connList->list;
    while (!QUEUE_IS_EMPTY(&msglist->msgQ)) {
      queue* h = QUEUE_HEAD(&msglist->msgQ);
      QUEUE_REMOVE(h);

      SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);

      transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
      pMsg->ctx->task = NULL;

      doNotifyApp(pMsg, pThrd);
    }
    taosMemoryFree(msglist);

dengyihao's avatar
dengyihao 已提交
574
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
575
  }
dengyihao's avatar
dengyihao 已提交
576
  taosHashCleanup(pool);
577
  return NULL;
dengyihao's avatar
dengyihao 已提交
578 579
}

dengyihao's avatar
dengyihao 已提交
580
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
dengyihao's avatar
dengyihao 已提交
581
  void*      pool = pThrd->pool;
dengyihao's avatar
dengyihao 已提交
582
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
583
  STrans*    pTranInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
584
  if (plist == NULL) {
dengyihao's avatar
dengyihao 已提交
585 586 587 588
    SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
    QUEUE_INIT(&nList->msgQ);
    nList->numOfConn++;

dengyihao's avatar
dengyihao 已提交
589
    SConnList list = {0};
dengyihao's avatar
dengyihao 已提交
590 591 592
    QUEUE_INIT(&list.conns);
    list.list = nList;

dengyihao's avatar
dengyihao 已提交
593
    taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
dengyihao's avatar
dengyihao 已提交
594
    return NULL;
dengyihao's avatar
dengyihao 已提交
595 596
  }

dengyihao's avatar
dengyihao 已提交
597 598
  SMsgList* msglist = plist->list;
  if (QUEUE_IS_EMPTY(&plist->conns) && msglist->numOfConn >= pTranInst->connLimitNum) {
dengyihao's avatar
dengyihao 已提交
599
    *exceed = true;
dengyihao's avatar
dengyihao 已提交
600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
    return NULL;
  }

  plist->size -= 1;
  queue*    h = QUEUE_HEAD(&plist->conns);
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
  conn->status = ConnNormal;
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);

  if (conn->task != NULL) {
    transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
    conn->task = NULL;
  }
  return conn;
}

static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
  void*      pool = pThrd->pool;
  STrans*    pTransInst = pThrd->pTransInst;
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
  if (plist == NULL) {
    SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
    QUEUE_INIT(&nList->msgQ);
dengyihao's avatar
dengyihao 已提交
624 625 626 627
    nList->numOfConn++;

    SConnList list = {0};
    QUEUE_INIT(&list.conns);
dengyihao's avatar
dengyihao 已提交
628 629 630 631
    list.list = nList;

    taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
632

dengyihao's avatar
dengyihao 已提交
633
    return NULL;
dengyihao's avatar
dengyihao 已提交
634 635 636 637
  }

  SMsgList* list = plist->list;
  // no avaliable conn in pool
dengyihao's avatar
dengyihao 已提交
638
  if (QUEUE_IS_EMPTY(&plist->conns)) {
dengyihao's avatar
dengyihao 已提交
639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668
    if ((list)->numOfConn >= pTransInst->connLimitNum) {
      STraceId* trace = &(*pMsg)->msg.info.traceId;

      STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
      arg->param1 = *pMsg;
      arg->param2 = pThrd;
      (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);

      tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));

      QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
      *pMsg = NULL;
    } else {
      // send msg in delay queue
      if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
        STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
        arg->param1 = *pMsg;
        arg->param2 = pThrd;
        (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);

        QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
        queue* h = QUEUE_HEAD(&(list)->msgQ);
        QUEUE_REMOVE(h);
        SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q);

        *pMsg = ans;
        transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
      }
      list->numOfConn++;
    }
dengyihao's avatar
dengyihao 已提交
669 670
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
671 672

  plist->size -= 1;
dengyihao's avatar
dengyihao 已提交
673
  queue*    h = QUEUE_HEAD(&plist->conns);
dengyihao's avatar
dengyihao 已提交
674
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
dengyihao's avatar
dengyihao 已提交
675
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
676 677
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
678

dengyihao's avatar
dengyihao 已提交
679 680 681 682
  if (conn->task != NULL) {
    transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
    conn->task = NULL;
  }
dengyihao's avatar
dengyihao 已提交
683
  return conn;
dengyihao's avatar
dengyihao 已提交
684
}
dengyihao's avatar
dengyihao 已提交
685 686 687 688 689 690
static void addConnToPool(void* pool, SCliConn* conn) {
  if (conn->status == ConnInPool) {
    return;
  }
  allocConnRef(conn, true);

dengyihao's avatar
dengyihao 已提交
691
  SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
692 693 694 695 696 697
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    taosArrayPush(thrd->timerList, &conn->timer);
    conn->timer->data = NULL;
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
698 699 700 701 702
  if (T_REF_VAL_GET(conn) > 1) {
    transUnrefCliHandle(conn);
  }

  cliDestroyConnMsgs(conn, false);
dengyihao's avatar
dengyihao 已提交
703

dengyihao's avatar
dengyihao 已提交
704
  if (conn->list == NULL) {
dengyihao's avatar
dengyihao 已提交
705
    conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
dengyihao's avatar
dengyihao 已提交
706
  }
dengyihao's avatar
dengyihao 已提交
707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727

  SConnList* pList = conn->list;
  SMsgList*  msgList = pList->list;
  if (!QUEUE_IS_EMPTY(&msgList->msgQ)) {
    queue* h = QUEUE_HEAD(&(msgList)->msgQ);
    QUEUE_REMOVE(h);

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);

    transDQCancel(thrd->waitConnQueue, pMsg->ctx->task);
    pMsg->ctx->task = NULL;

    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
    transQueuePush(&conn->cliMsgs, pMsg);

    conn->status = ConnNormal;
    cliSend(conn);
    return;
  }

  conn->status = ConnInPool;
dengyihao's avatar
dengyihao 已提交
728
  QUEUE_PUSH(&conn->list->conns, &conn->q);
dengyihao's avatar
dengyihao 已提交
729
  conn->list->size += 1;
dengyihao's avatar
dengyihao 已提交
730

dengyihao's avatar
dengyihao 已提交
731
  if (conn->list->size >= 250) {
dengyihao's avatar
dengyihao 已提交
732 733
    STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
    arg->param1 = conn;
dengyihao's avatar
dengyihao 已提交
734
    arg->param2 = thrd;
dengyihao's avatar
dengyihao 已提交
735 736

    STrans* pTransInst = thrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
737 738
    conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
  }
dengyihao's avatar
dengyihao 已提交
739
}
dengyihao's avatar
dengyihao 已提交
740
static int32_t allocConnRef(SCliConn* conn, bool update) {
dengyihao's avatar
dengyihao 已提交
741
  if (update) {
dengyihao's avatar
dengyihao 已提交
742
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
743
    transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
744
    conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
745 746 747 748
  }
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
749
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
dengyihao's avatar
dengyihao 已提交
750
  conn->refId = exh->refId;
751 752 753 754

  if (conn->refId == -1) {
    taosMemoryFree(exh);
  }
dengyihao's avatar
dengyihao 已提交
755 756 757 758 759
  return 0;
}

static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
  if (update) {
dengyihao's avatar
dengyihao 已提交
760
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
761 762 763 764 765 766 767 768 769 770 771 772 773
    transRemoveExHandle(transGetRefMgt(), conn->refId);
    conn->refId = -1;
  }
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
  if (exh == NULL) {
    return -1;
  }
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
  conn->refId = exh->refId;

  transReleaseExHandle(transGetRefMgt(), handle);
  return 0;
dengyihao's avatar
dengyihao 已提交
774
}
dengyihao's avatar
dengyihao 已提交
775

dengyihao's avatar
dengyihao 已提交
776
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
777
  SCliConn*    conn = handle->data;
dengyihao's avatar
dengyihao 已提交
778
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
779
  tDebug("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
780
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
781
}
U
ubuntu 已提交
782
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
783
  // impl later
dengyihao's avatar
dengyihao 已提交
784 785 786
  if (handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
787 788
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
789
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
790
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
791
    while (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
792
      tDebug("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
793 794 795 796 797 798
      if (pBuf->invalid) {
        cliHandleExcept(conn);
        break;
      } else {
        cliHandleResp(conn);
      }
dengyihao's avatar
dengyihao 已提交
799
    }
dengyihao's avatar
dengyihao 已提交
800 801
    return;
  }
dengyihao's avatar
dengyihao 已提交
802

803
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
804 805 806
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
dengyihao's avatar
dengyihao 已提交
807
    tTrace("%s conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
808 809
    return;
  }
dengyihao's avatar
fix bug  
dengyihao 已提交
810
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
811
    tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
812
    conn->broken = true;
U
ubuntu 已提交
813
    cliHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
814
  }
dengyihao's avatar
dengyihao 已提交
815
}
dengyihao's avatar
dengyihao 已提交
816

dengyihao's avatar
dengyihao 已提交
817
static SCliConn* cliCreateConn(SCliThrd* pThrd) {
wafwerar's avatar
wafwerar 已提交
818
  SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
819
  // read/write stream handle
G
gccgdb1234 已提交
820
  conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
821 822 823
  uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
  conn->stream->data = conn;

dengyihao's avatar
dengyihao 已提交
824 825 826 827 828 829 830 831
  uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
  if (timer == NULL) {
    timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    tDebug("no available timer, create a timer %p", timer);
    uv_timer_init(pThrd->loop, timer);
  }
  timer->data = conn;
  conn->timer = timer;
dengyihao's avatar
dengyihao 已提交
832

dengyihao's avatar
dengyihao 已提交
833
  conn->connReq.data = conn;
dengyihao's avatar
dengyihao 已提交
834 835
  transReqQueueInit(&conn->wreqQueue);

dengyihao's avatar
dengyihao 已提交
836
  transQueueInit(&conn->cliMsgs, NULL);
dengyihao's avatar
opt rpc  
dengyihao 已提交
837 838

  transInitBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
839
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
840
  conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
841
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
842
  conn->broken = false;
dengyihao's avatar
dengyihao 已提交
843
  transRefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
844

dengyihao's avatar
dengyihao 已提交
845
  atomic_add_fetch_32(&pThrd->connCount, 1);
dengyihao's avatar
dengyihao 已提交
846
  allocConnRef(conn, false);
dengyihao's avatar
dengyihao 已提交
847

dengyihao's avatar
dengyihao 已提交
848 849
  return conn;
}
U
ubuntu 已提交
850
static void cliDestroyConn(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
851
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
852
  tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
853

dengyihao's avatar
dengyihao 已提交
854 855
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
856 857 858 859 860 861 862 863 864 865

  if (conn->list != NULL) {
    SConnList* connList = conn->list;
    connList->list->numOfConn--;
  } else {
    SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
    connList->list->numOfConn--;
  }
  conn->list = NULL;

dengyihao's avatar
dengyihao 已提交
866
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
867
  transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
opt rpc  
dengyihao 已提交
868
  conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
869

dengyihao's avatar
dengyihao 已提交
870 871 872 873 874 875 876
  if (conn->task != NULL) {
    transDQCancel(pThrd->timeoutQueue, conn->task);
    conn->task = NULL;
  }
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
877
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
878 879
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
880

dengyihao's avatar
dengyihao 已提交
881
  if (clear) {
dengyihao's avatar
dengyihao 已提交
882
    if (!uv_is_closing((uv_handle_t*)conn->stream)) {
dengyihao's avatar
dengyihao 已提交
883
      uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
884 885
      uv_close((uv_handle_t*)conn->stream, cliDestroy);
    }
886
  }
dengyihao's avatar
dengyihao 已提交
887
}
U
ubuntu 已提交
888
static void cliDestroy(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
889 890 891
  if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
892
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
893
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
894 895
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
dengyihao's avatar
dengyihao 已提交
896 897
    taosArrayPush(pThrd->timerList, &conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
898 899
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
900

dengyihao's avatar
dengyihao 已提交
901 902
  atomic_sub_fetch_32(&pThrd->connCount, 1);

dengyihao's avatar
dengyihao 已提交
903
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
904
  transRemoveExHandle(transGetRefMgt(), conn->refId);
wafwerar's avatar
wafwerar 已提交
905 906
  taosMemoryFree(conn->ip);
  taosMemoryFree(conn->stream);
dengyihao's avatar
dengyihao 已提交
907 908 909

  cliDestroyConnMsgs(conn, true);

dengyihao's avatar
dengyihao 已提交
910
  tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
911
  transReqQueueClear(&conn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
912
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
913

wafwerar's avatar
wafwerar 已提交
914
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
915
}
dengyihao's avatar
dengyihao 已提交
916
static bool cliHandleNoResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
917 918
  bool res = false;
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
919
    SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0);
dengyihao's avatar
dengyihao 已提交
920
    if (REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
921
      transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
922 923 924 925 926
      destroyCmsg(pMsg);
      res = true;
    }
    if (res == true) {
      if (cliMaySendCachedMsg(conn) == false) {
dengyihao's avatar
dengyihao 已提交
927
        SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
928
        addConnToPool(thrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
929 930 931
        res = false;
      } else {
        res = true;
dengyihao's avatar
dengyihao 已提交
932 933 934 935 936
      }
    }
  }
  return res;
}
U
ubuntu 已提交
937
static void cliSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
938 939
  SCliConn* pConn = transReqQueueRemove(req);
  if (pConn == NULL) return;
dengyihao's avatar
dengyihao 已提交
940

dengyihao's avatar
dengyihao 已提交
941 942 943
  SCliMsg* pMsg = !transQueueEmpty(&pConn->cliMsgs) ? transQueueGet(&pConn->cliMsgs, 0) : NULL;
  if (pMsg != NULL) {
    int64_t cost = taosGetTimestampUs() - pMsg->st;
dengyihao's avatar
dengyihao 已提交
944
    if (cost > 1000 * 20) {
dengyihao's avatar
dengyihao 已提交
945 946 947 948
      tWarn("%s conn %p send cost:%dus, send exception", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
    }
  }

dengyihao's avatar
dengyihao 已提交
949
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
950
    tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
951
  } else {
dengyihao's avatar
dengyihao 已提交
952 953 954 955
    if (!uv_is_closing((uv_handle_t*)&pConn->stream)) {
      tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
      cliHandleExcept(pConn);
    }
dengyihao's avatar
dengyihao 已提交
956 957
    return;
  }
dengyihao's avatar
dengyihao 已提交
958
  if (cliHandleNoResp(pConn) == true) {
dengyihao's avatar
dengyihao 已提交
959
    tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
960 961
    return;
  }
dengyihao's avatar
dengyihao 已提交
962
  uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
963
}
dengyihao's avatar
dengyihao 已提交
964 965 966 967
void cliSendBatch(SCliConn* pConn) {
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

dengyihao's avatar
dengyihao 已提交
968 969 970 971 972
  SCliBatch*     pBatch = pConn->pBatch;
  SCliBatchList* pList = pBatch->pList;
  pList->connCnt += 1;

  int32_t wLen = pBatch->wLen;
dengyihao's avatar
dengyihao 已提交
973 974 975 976

  uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
  int       i = 0;

dengyihao's avatar
dengyihao 已提交
977 978
  queue* h = NULL;
  QUEUE_FOREACH(h, &pBatch->wq) {
dengyihao's avatar
dengyihao 已提交
979 980 981 982 983 984 985 986 987
    SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q);

    STransConnCtx* pCtx = pCliMsg->ctx;

    STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
    if (pMsg->pCont == 0) {
      pMsg->pCont = (void*)rpcMallocCont(0);
      pMsg->contLen = 0;
    }
dengyihao's avatar
dengyihao 已提交
988

dengyihao's avatar
dengyihao 已提交
989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004
    int            msgLen = transMsgLenFromCont(pMsg->contLen);
    STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);

    if (pHead->comp == 0) {
      pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
      pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
      pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
      pHead->msgType = pMsg->msgType;
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
      memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
      pHead->traceId = pMsg->info.traceId;
      pHead->magicNum = htonl(TRANS_MAGIC_NUM);
    }
    pHead->timestamp = taosHton64(taosGetTimestampUs());

dengyihao's avatar
dengyihao 已提交
1005 1006 1007 1008 1009 1010 1011 1012
    if (pHead->comp == 0) {
      if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
        msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
        pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      }
    } else {
      msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
    }
dengyihao's avatar
dengyihao 已提交
1013 1014 1015 1016 1017
    wb[i++] = uv_buf_init((char*)pHead, msgLen);
  }

  uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
  req->data = pConn;
dengyihao's avatar
dengyihao 已提交
1018
  tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn,
dengyihao's avatar
dengyihao 已提交
1019
         pBatch->wLen, pBatch->batchSize);
dengyihao's avatar
dengyihao 已提交
1020 1021 1022
  uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
  taosMemoryFree(wb);
}
U
ubuntu 已提交
1023
void cliSend(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
1024 1025 1026 1027 1028
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  if (transQueueEmpty(&pConn->cliMsgs)) {
    tError("%s conn %p not msg to send", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
1029
    cliHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
1030 1031
    return;
  }
dengyihao's avatar
dengyihao 已提交
1032 1033

  SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
1034
  CONN_GET_NEXT_SENDMSG(pConn);
dengyihao's avatar
dengyihao 已提交
1035 1036
  pCliMsg->sent = 1;

dengyihao's avatar
dengyihao 已提交
1037
  STransConnCtx* pCtx = pCliMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1038

U
ubuntu 已提交
1039
  STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
dengyihao's avatar
dengyihao 已提交
1040 1041 1042 1043
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
dengyihao's avatar
dengyihao 已提交
1044

dengyihao's avatar
dengyihao 已提交
1045
  int            msgLen = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
1046
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
1047

dengyihao's avatar
dengyihao 已提交
1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058
  if (pHead->comp == 0) {
    pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
    pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
    pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
    pHead->msgType = pMsg->msgType;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
    memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
    pHead->traceId = pMsg->info.traceId;
    pHead->magicNum = htonl(TRANS_MAGIC_NUM);
  }
dengyihao's avatar
dengyihao 已提交
1059
  pHead->timestamp = taosHton64(taosGetTimestampUs());
dengyihao's avatar
dengyihao 已提交
1060

dengyihao's avatar
dengyihao 已提交
1061 1062 1063
  if (pHead->persist == 1) {
    CONN_SET_PERSIST_BY_APP(pConn);
  }
dengyihao's avatar
dengyihao 已提交
1064

dengyihao's avatar
dengyihao 已提交
1065
  STraceId* trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
1066

dengyihao's avatar
dengyihao 已提交
1067
  if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
dengyihao's avatar
dengyihao 已提交
1068
    uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
dengyihao's avatar
dengyihao 已提交
1069 1070
    if (timer == NULL) {
      timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
dengyihao's avatar
dengyihao 已提交
1071
      tDebug("no available timer, create a timer %p", timer);
dengyihao's avatar
dengyihao 已提交
1072 1073 1074 1075 1076
      uv_timer_init(pThrd->loop, timer);
    }
    timer->data = pConn;
    pConn->timer = timer;

dengyihao's avatar
dengyihao 已提交
1077 1078 1079
    tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
    uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
  }
dengyihao's avatar
dengyihao 已提交
1080

dengyihao's avatar
dengyihao 已提交
1081 1082 1083 1084 1085 1086 1087
  if (pHead->comp == 0) {
    if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
      msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    }
  } else {
    msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
dengyihao's avatar
dengyihao 已提交
1088
  }
dengyihao's avatar
dengyihao 已提交
1089

dengyihao's avatar
dengyihao 已提交
1090 1091
  tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
          TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
dengyihao's avatar
dengyihao 已提交
1092

dengyihao's avatar
dengyihao 已提交
1093
  uv_buf_t    wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
1094
  uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
1095 1096 1097

  int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1098
    tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
dengyihao's avatar
dengyihao 已提交
1099 1100 1101
            uv_err_name(status));
    cliHandleExcept(pConn);
  }
U
ubuntu 已提交
1102 1103
  return;
_RETURN:
dengyihao's avatar
dengyihao 已提交
1104
  return;
dengyihao's avatar
dengyihao 已提交
1105
}
dengyihao's avatar
dengyihao 已提交
1106 1107

static void cliDestroyBatch(SCliBatch* pBatch) {
dengyihao's avatar
dengyihao 已提交
1108
  if (pBatch == NULL) return;
dengyihao's avatar
dengyihao 已提交
1109
  while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
1110 1111
    queue* h = QUEUE_HEAD(&pBatch->wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
1112

dengyihao's avatar
dengyihao 已提交
1113
    SCliMsg* p = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1114 1115
    destroyCmsg(p);
  }
dengyihao's avatar
dengyihao 已提交
1116 1117
  SCliBatchList* p = pBatch->pList;
  p->sending -= 1;
dengyihao's avatar
dengyihao 已提交
1118 1119
  taosMemoryFree(pBatch);
}
dengyihao's avatar
dengyihao 已提交
1120
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1121 1122 1123 1124 1125
  if (pThrd->quit == true) {
    cliDestroyBatch(pBatch);
    return;
  }

dengyihao's avatar
dengyihao 已提交
1126
  if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
1127 1128
    return;
  }
dengyihao's avatar
dengyihao 已提交
1129 1130
  STrans*        pTransInst = pThrd->pTransInst;
  SCliBatchList* pList = pBatch->pList;
dengyihao's avatar
dengyihao 已提交
1131

dengyihao's avatar
dengyihao 已提交
1132 1133 1134
  char key[TSDB_FQDN_LEN + 64] = {0};
  CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port);

dengyihao's avatar
dengyihao 已提交
1135 1136
  bool      exceed = false;
  SCliConn* conn = getConnFromPool(pThrd, key, &exceed);
dengyihao's avatar
dengyihao 已提交
1137

dengyihao's avatar
dengyihao 已提交
1138 1139 1140
  if (conn == NULL && exceed) {
    tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen,
           pBatch->batchSize, pTransInst->connLimitNum);
dengyihao's avatar
dengyihao 已提交
1141
    cliDestroyBatch(pBatch);
dengyihao's avatar
dengyihao 已提交
1142 1143
    return;
  }
dengyihao's avatar
dengyihao 已提交
1144 1145
  if (conn == NULL) {
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
1146 1147
    conn->pBatch = pBatch;
    conn->ip = strdup(pList->dst);
dengyihao's avatar
dengyihao 已提交
1148

dengyihao's avatar
dengyihao 已提交
1149
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip);
dengyihao's avatar
dengyihao 已提交
1150 1151 1152 1153 1154 1155
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1156
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1157 1158 1159 1160 1161
      return;
    }
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1162
    addr.sin_port = (uint16_t)htons(pList->port);
dengyihao's avatar
dengyihao 已提交
1163

dengyihao's avatar
dengyihao 已提交
1164
    tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst);
dengyihao's avatar
dengyihao 已提交
1165 1166
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
    if (fd == -1) {
dengyihao's avatar
dengyihao 已提交
1167 1168 1169
      tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
             tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1170 1171 1172 1173
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1174 1175
      tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1176 1177 1178 1179
      return;
    }
    ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1180 1181
      tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1182 1183 1184 1185 1186 1187 1188 1189 1190
      return;
    }

    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
    if (ret != 0) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
1191
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1192 1193 1194 1195 1196 1197
      return;
    }
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
    return;
  }

dengyihao's avatar
dengyihao 已提交
1198
  conn->pBatch = pBatch;
dengyihao's avatar
dengyihao 已提交
1199
  cliSendBatch(conn);
dengyihao's avatar
dengyihao 已提交
1200 1201
}
static void cliSendBatchCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
1202 1203 1204
  SCliConn*  conn = req->data;
  SCliThrd*  thrd = conn->hostThrd;
  SCliBatch* p = conn->pBatch;
dengyihao's avatar
dengyihao 已提交
1205

dengyihao's avatar
dengyihao 已提交
1206
  SCliBatchList* pBatchList = p->pList;
dengyihao's avatar
dengyihao 已提交
1207
  SCliBatch*     nxtBatch = cliGetHeadFromList(pBatchList);
dengyihao's avatar
dengyihao 已提交
1208 1209
  pBatchList->connCnt -= 1;

dengyihao's avatar
dengyihao 已提交
1210 1211 1212
  conn->pBatch = NULL;

  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1213
    tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
dengyihao's avatar
dengyihao 已提交
1214
           p->wLen, p->batchSize, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
1215 1216 1217

    if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn);

dengyihao's avatar
dengyihao 已提交
1218
    cliHandleBatchReq(nxtBatch, thrd);
dengyihao's avatar
dengyihao 已提交
1219
  } else {
dengyihao's avatar
dengyihao 已提交
1220
    tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
dengyihao's avatar
dengyihao 已提交
1221
           p->batchSize);
dengyihao's avatar
dengyihao 已提交
1222
    if (!uv_is_closing((uv_handle_t*)&conn->stream)) {
dengyihao's avatar
dengyihao 已提交
1223 1224 1225 1226 1227 1228
      if (nxtBatch != NULL) {
        conn->pBatch = nxtBatch;
        cliSendBatch(conn);
      } else {
        addConnToPool(thrd->pool, conn);
      }
dengyihao's avatar
dengyihao 已提交
1229
    } else {
dengyihao's avatar
dengyihao 已提交
1230 1231
      cliDestroyBatch(nxtBatch);
      // conn release by other callback
dengyihao's avatar
dengyihao 已提交
1232
    }
dengyihao's avatar
dengyihao 已提交
1233
  }
dengyihao's avatar
dengyihao 已提交
1234 1235 1236

  cliDestroyBatch(p);
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
1237
}
dengyihao's avatar
dengyihao 已提交
1238
static void cliHandleFastFail(SCliConn* pConn, int status) {
dengyihao's avatar
dengyihao 已提交
1239
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1240
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1241 1242 1243

  if (status == -1) status = ENETUNREACH;

dengyihao's avatar
dengyihao 已提交
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262
  if (pConn->pBatch == NULL) {
    SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);

    STraceId* trace = &pMsg->msg.info.traceId;
    tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
            TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status));

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
        (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
      SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip));
      int64_t        cTimestamp = taosGetTimestampMs();
      if (item != NULL) {
        int32_t elapse = cTimestamp - item->timestamp;
        if (elapse >= 0 && elapse <= pTransInst->failFastInterval) {
          item->count++;
        } else {
          item->count = 1;
          item->timestamp = cTimestamp;
        }
dengyihao's avatar
dengyihao 已提交
1263
      } else {
dengyihao's avatar
dengyihao 已提交
1264 1265
        SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
        taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem));
dengyihao's avatar
dengyihao 已提交
1266 1267
      }
    }
dengyihao's avatar
dengyihao 已提交
1268
  } else {
dengyihao's avatar
dengyihao 已提交
1269 1270
    tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
           pConn, pConn->ip, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
1271 1272
    cliDestroyBatch(pConn->pBatch);
    pConn->pBatch = NULL;
dengyihao's avatar
dengyihao 已提交
1273 1274 1275
  }
  cliHandleExcept(pConn);
}
dengyihao's avatar
dengyihao 已提交
1276

dengyihao's avatar
dengyihao 已提交
1277 1278 1279
void cliConnCb(uv_connect_t* req, int status) {
  SCliConn* pConn = req->data;
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1280 1281 1282 1283 1284 1285 1286 1287 1288 1289
  bool      timeout = false;

  if (pConn->timer == NULL) {
    timeout = true;
  } else {
    uv_timer_stop(pConn->timer);
    pConn->timer->data = NULL;
    taosArrayPush(pThrd->timerList, &pConn->timer);
    pConn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
1290 1291

  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1292 1293 1294 1295 1296
    if (timeout == false) {
      cliHandleFastFail(pConn, status);
    } else if (timeout == true) {
      // already deal by timeout
    }
1297
    return;
dengyihao's avatar
dengyihao 已提交
1298
  }
dengyihao's avatar
dengyihao 已提交
1299

dengyihao's avatar
dengyihao 已提交
1300 1301
  struct sockaddr peername, sockname;
  int             addrlen = sizeof(peername);
dengyihao's avatar
dengyihao 已提交
1302
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
dengyihao's avatar
dengyihao 已提交
1303
  transSockInfo2Str(&peername, pConn->dst);
dengyihao's avatar
dengyihao 已提交
1304

dengyihao's avatar
dengyihao 已提交
1305 1306
  addrlen = sizeof(sockname);
  uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
dengyihao's avatar
dengyihao 已提交
1307
  transSockInfo2Str(&sockname, pConn->src);
dengyihao's avatar
dengyihao 已提交
1308

dengyihao's avatar
dengyihao 已提交
1309
  tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
1310 1311 1312 1313 1314
  if (pConn->pBatch != NULL) {
    cliSendBatch(pConn);
  } else {
    cliSend(pConn);
  }
dengyihao's avatar
dengyihao 已提交
1315 1316
}

dengyihao's avatar
dengyihao 已提交
1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
  STransConnCtx* pCtx = pMsg->ctx;
  STrans*        pTransInst = pThrd->pTransInst;

  STransMsg transMsg = {0};
  transMsg.contLen = 0;
  transMsg.pCont = NULL;
  transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS;
  transMsg.msgType = pMsg->msg.msgType + 1;
  transMsg.info.ahandle = pMsg->ctx->ahandle;
  transMsg.info.traceId = pMsg->msg.info.traceId;
  transMsg.info.hasEpSet = false;
  if (pCtx->pSem != NULL) {
    if (pCtx->pRsp == NULL) {
    } else {
      memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
    }
  } else {
    pTransInst->cfp(pTransInst->parent, &transMsg, NULL);
  }

  destroyCmsg(pMsg);
}
dengyihao's avatar
dengyihao 已提交
1340
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1341 1342 1343 1344 1345
  if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) {
    pThrd->stopMsg = pMsg;
    return;
  }
  pThrd->stopMsg = NULL;
dengyihao's avatar
dengyihao 已提交
1346
  pThrd->quit = true;
U
ubuntu 已提交
1347
  tDebug("cli work thread %p start to quit", pThrd);
dengyihao's avatar
dengyihao 已提交
1348
  destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1349

dengyihao's avatar
fix bug  
dengyihao 已提交
1350
  destroyConnPool(pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1351
  uv_walk(pThrd->loop, cliWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
1352
}
dengyihao's avatar
dengyihao 已提交
1353
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1354
  int64_t    refId = (int64_t)(pMsg->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1355
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1356
  if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1357
    tDebug("%" PRId64 " already released", refId);
dengyihao's avatar
dengyihao 已提交
1358 1359
    destroyCmsg(pMsg);
    return;
dengyihao's avatar
dengyihao 已提交
1360 1361 1362
  }

  SCliConn* conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1363
  transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1364
  tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
1365

dengyihao's avatar
dengyihao 已提交
1366 1367
  if (T_REF_VAL_GET(conn) == 2) {
    transUnrefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
1368 1369
    if (!transQueuePush(&conn->cliMsgs, pMsg)) {
      return;
dengyihao's avatar
dengyihao 已提交
1370 1371
    }
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1372 1373 1374
  } else {
    tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1375 1376
  }
}
dengyihao's avatar
dengyihao 已提交
1377
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1378
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1379
  pThrd->cvtAddr = pCtx->cvtAddr;
dengyihao's avatar
dengyihao 已提交
1380 1381
  destroyCmsg(pMsg);
}
U
ubuntu 已提交
1382

dengyihao's avatar
dengyihao 已提交
1383 1384
SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
  STransConnCtx* pCtx = (*pMsg)->ctx;
dengyihao's avatar
dengyihao 已提交
1385 1386
  SCliConn*      conn = NULL;

dengyihao's avatar
dengyihao 已提交
1387
  int64_t refId = (int64_t)((*pMsg)->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1388
  if (refId != 0) {
dengyihao's avatar
dengyihao 已提交
1389
    SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1390
    if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1391
      tError("failed to get conn, refId: %" PRId64 "", refId);
dengyihao's avatar
dengyihao 已提交
1392 1393
      *ignore = true;
      return NULL;
dengyihao's avatar
dengyihao 已提交
1394 1395
    } else {
      conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1396
      if (conn == NULL) {
dengyihao's avatar
dengyihao 已提交
1397
        conn = getConnFromPool2(pThrd, addr, pMsg);
dengyihao's avatar
dengyihao 已提交
1398
        if (conn != NULL) specifyConnRef(conn, true, refId);
dengyihao's avatar
dengyihao 已提交
1399
      }
dengyihao's avatar
dengyihao 已提交
1400
      transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1401 1402 1403
    }
    return conn;
  };
dengyihao's avatar
dengyihao 已提交
1404

dengyihao's avatar
dengyihao 已提交
1405
  conn = getConnFromPool2(pThrd, addr, pMsg);
dengyihao's avatar
dengyihao 已提交
1406
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1407
    tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1408
  } else {
dengyihao's avatar
dengyihao 已提交
1409
    tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1410
  }
dengyihao's avatar
dengyihao 已提交
1411 1412
  return conn;
}
dengyihao's avatar
dengyihao 已提交
1413
FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
dengyihao's avatar
dengyihao 已提交
1414 1415 1416
  if (pCvtAddr->cvt == false) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1417 1418 1419
  if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
    memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN);
    memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
dengyihao's avatar
dengyihao 已提交
1420 1421
  }
}
dengyihao's avatar
dengyihao 已提交
1422

dengyihao's avatar
dengyihao 已提交
1423
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
dengyihao's avatar
dengyihao 已提交
1424
  if (code != 0) return false;
dengyihao's avatar
dengyihao 已提交
1425
  // if (pCtx->retryCnt == 0) return false;
dengyihao's avatar
dengyihao 已提交
1426 1427 1428
  if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
  return true;
}
dengyihao's avatar
dengyihao 已提交
1429
FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
dengyihao's avatar
dengyihao 已提交
1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440
  if (pMsg == NULL) return -1;

  memset(pResp, 0, sizeof(STransMsg));

  pResp->code = TSDB_CODE_RPC_BROKEN_LINK;
  pResp->msgType = pMsg->msg.msgType + 1;
  pResp->info.ahandle = pMsg->ctx ? pMsg->ctx->ahandle : NULL;
  pResp->info.traceId = pMsg->msg.info.traceId;

  return 0;
}
dengyihao's avatar
dengyihao 已提交
1441 1442 1443 1444 1445
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
  uint32_t  addr = 0;
  uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
  if (v == NULL) {
    addr = taosGetIpv4FromFqdn(fqdn);
1446 1447 1448 1449
    if (addr == 0xffffffff) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr());
      return addr;
dengyihao's avatar
dengyihao 已提交
1450 1451
    }

dengyihao's avatar
dengyihao 已提交
1452 1453 1454 1455 1456 1457 1458 1459 1460 1461
    taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
  } else {
    addr = *v;
  }
  return addr;
}
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
  // impl later
  return;
}
dengyihao's avatar
dengyihao 已提交
1462

dengyihao's avatar
dengyihao 已提交
1463 1464 1465 1466 1467
static void doFreeTimeoutMsg(void* param) {
  STaskArg* arg = param;
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1468

dengyihao's avatar
dengyihao 已提交
1469 1470 1471 1472 1473 1474
  QUEUE_REMOVE(&pMsg->q);
  STraceId* trace = &pMsg->msg.info.traceId;
  tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
  doNotifyApp(pMsg, pThrd);
  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
1475
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1476
  STrans* pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1477

dengyihao's avatar
dengyihao 已提交
1478 1479
  cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
  if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
dengyihao's avatar
dengyihao 已提交
1480
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1481 1482
    return;
  }
dengyihao's avatar
dengyihao 已提交
1483

dengyihao's avatar
dengyihao 已提交
1484 1485
  char*    fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
  uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
dengyihao's avatar
dengyihao 已提交
1486 1487
  char     addr[TSDB_FQDN_LEN + 64] = {0};
  CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port);
dengyihao's avatar
dengyihao 已提交
1488

dengyihao's avatar
dengyihao 已提交
1489
  bool      ignore = false;
dengyihao's avatar
dengyihao 已提交
1490
  SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr);
dengyihao's avatar
dengyihao 已提交
1491
  if (ignore == true) {
dengyihao's avatar
dengyihao 已提交
1492
    // persist conn already release by server
dengyihao's avatar
dengyihao 已提交
1493 1494
    STransMsg resp;
    cliBuildExceptResp(pMsg, &resp);
dengyihao's avatar
dengyihao 已提交
1495 1496 1497
    if (pMsg->type != Release) {
      pTransInst->cfp(pTransInst->parent, &resp, NULL);
    }
dengyihao's avatar
dengyihao 已提交
1498
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1499 1500
    return;
  }
dengyihao's avatar
dengyihao 已提交
1501
  if (conn == NULL && pMsg == NULL) {
dengyihao's avatar
dengyihao 已提交
1502 1503
    return;
  }
dengyihao's avatar
dengyihao 已提交
1504
  STraceId* trace = &pMsg->msg.info.traceId;
dengyihao's avatar
dengyihao 已提交
1505

dengyihao's avatar
dengyihao 已提交
1506
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1507
    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
dengyihao's avatar
dengyihao 已提交
1508
    transQueuePush(&conn->cliMsgs, pMsg);
U
ubuntu 已提交
1509
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1510
  } else {
U
ubuntu 已提交
1511
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
1512 1513 1514 1515

    int64_t refId = (int64_t)pMsg->msg.info.handle;
    if (refId != 0) specifyConnRef(conn, true, refId);

dengyihao's avatar
dengyihao 已提交
1516
    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
dengyihao's avatar
dengyihao 已提交
1517
    transQueuePush(&conn->cliMsgs, pMsg);
dengyihao's avatar
dengyihao 已提交
1518

dengyihao's avatar
dengyihao 已提交
1519
    conn->ip = strdup(addr);
dengyihao's avatar
dengyihao 已提交
1520
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
1521 1522 1523 1524 1525 1526 1527 1528 1529
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1530

dengyihao's avatar
dengyihao 已提交
1531
    struct sockaddr_in addr;
1532
    addr.sin_family = AF_INET;
1533
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1534
    addr.sin_port = (uint16_t)htons(port);
dengyihao's avatar
dengyihao 已提交
1535

dengyihao's avatar
dengyihao 已提交
1536
    tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip);
1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
    if (fd == -1) {
      tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
              tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleExcept(conn);
      errno = 0;
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
      tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
    ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret != 0) {
      tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1557

1558
    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
dengyihao's avatar
dengyihao 已提交
1559
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1560 1561 1562 1563 1564
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1565
      cliHandleFastFail(conn, ret);
dengyihao's avatar
dengyihao 已提交
1566 1567
      return;
    }
dengyihao's avatar
dengyihao 已提交
1568
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
dengyihao's avatar
dengyihao 已提交
1569
  }
dengyihao's avatar
dengyihao 已提交
1570
  tGTrace("%s conn %p ready", pTransInst->label, conn);
dengyihao's avatar
dengyihao 已提交
1571
}
dengyihao's avatar
dengyihao 已提交
1572

dengyihao's avatar
dengyihao 已提交
1573
static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1574
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1575

dengyihao's avatar
dengyihao 已提交
1576 1577
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1578 1579 1580
    QUEUE_REMOVE(h);

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1581 1582 1583 1584 1585

    if (pMsg->type == Quit) {
      pThrd->stopMsg = pMsg;
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1586 1587 1588 1589 1590 1591 1592 1593
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);

    count++;
  }
  if (count >= 2) {
    tTrace("cli process batch size:%d", count);
  }
}
dengyihao's avatar
dengyihao 已提交
1594
SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
dengyihao's avatar
dengyihao 已提交
1595
  if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) {
dengyihao's avatar
dengyihao 已提交
1596 1597 1598 1599
    return NULL;
  }
  queue* hr = QUEUE_HEAD(&pList->wq);
  QUEUE_REMOVE(hr);
dengyihao's avatar
dengyihao 已提交
1600
  pList->sending += 1;
dengyihao's avatar
dengyihao 已提交
1601 1602 1603 1604 1605 1606

  pList->len -= 1;

  SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
  return batch;
}
dengyihao's avatar
dengyihao 已提交
1607

dengyihao's avatar
dengyihao 已提交
1608
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1609 1610
  STrans* pInst = pThrd->pTransInst;

dengyihao's avatar
dengyihao 已提交
1611
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1612 1613
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1614
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
1615 1616

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1617 1618 1619 1620 1621 1622

    if (pMsg->type == Quit) {
      pThrd->stopMsg = pMsg;
      continue;
    }

dengyihao's avatar
dengyihao 已提交
1623
    if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
1624 1625 1626 1627 1628 1629 1630
      STransConnCtx* pCtx = pMsg->ctx;

      char*    ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
      uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
      char     key[TSDB_FQDN_LEN + 64] = {0};
      CONN_CONSTRUCT_HASH_KEY(key, ip, port);

dengyihao's avatar
dengyihao 已提交
1631 1632 1633 1634 1635
      // SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key));
      SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key));
      if (ppBatchList == NULL || *ppBatchList == NULL) {
        SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
        QUEUE_INIT(&pBatchList->wq);
dengyihao's avatar
dengyihao 已提交
1636
        pBatchList->connMax = pInst->connLimitNum;
dengyihao's avatar
dengyihao 已提交
1637
        pBatchList->connCnt = 0;
dengyihao's avatar
dengyihao 已提交
1638
        pBatchList->batchLenLimit = pInst->batchSize;
dengyihao's avatar
dengyihao 已提交
1639
        pBatchList->len += 1;
dengyihao's avatar
dengyihao 已提交
1640

dengyihao's avatar
dengyihao 已提交
1641 1642 1643 1644
        pBatchList->ip = strdup(ip);
        pBatchList->dst = strdup(key);
        pBatchList->port = port;

dengyihao's avatar
dengyihao 已提交
1645 1646
        SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
        QUEUE_INIT(&pBatch->wq);
dengyihao's avatar
dengyihao 已提交
1647 1648
        QUEUE_INIT(&pBatch->listq);

dengyihao's avatar
dengyihao 已提交
1649 1650 1651
        QUEUE_PUSH(&pBatch->wq, h);
        pBatch->wLen += 1;
        pBatch->batchSize += pMsg->msg.contLen;
dengyihao's avatar
dengyihao 已提交
1652
        pBatch->pList = pBatchList;
dengyihao's avatar
dengyihao 已提交
1653

dengyihao's avatar
dengyihao 已提交
1654
        QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
dengyihao's avatar
dengyihao 已提交
1655

dengyihao's avatar
dengyihao 已提交
1656
        taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
1657
      } else {
dengyihao's avatar
dengyihao 已提交
1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678
        if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
          SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
          QUEUE_INIT(&pBatch->wq);
          QUEUE_INIT(&pBatch->listq);

          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->wLen += 1;
          pBatch->batchSize = pMsg->msg.contLen;
          pBatch->pList = *ppBatchList;

          QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
          (*ppBatchList)->len += 1;

          continue;
        }

        queue*     hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
        SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
        if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) {
          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->batchSize += pMsg->msg.contLen;
dengyihao's avatar
dengyihao 已提交
1679
          pBatch->wLen += 1;
dengyihao's avatar
dengyihao 已提交
1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692
        } else {
          SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
          QUEUE_INIT(&pBatch->wq);
          QUEUE_INIT(&pBatch->listq);

          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->wLen += 1;
          pBatch->batchSize += pMsg->msg.contLen;
          pBatch->pList = *ppBatchList;

          QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
          (*ppBatchList)->len += 1;
        }
dengyihao's avatar
dengyihao 已提交
1693
      }
dengyihao's avatar
dengyihao 已提交
1694
      continue;
dengyihao's avatar
dengyihao 已提交
1695
    }
dengyihao's avatar
dengyihao 已提交
1696
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1697
    count++;
dengyihao's avatar
dengyihao 已提交
1698
  }
dengyihao's avatar
dengyihao 已提交
1699

dengyihao's avatar
dengyihao 已提交
1700
  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
dengyihao's avatar
dengyihao 已提交
1701
  while (pIter != NULL) {
dengyihao's avatar
dengyihao 已提交
1702
    SCliBatchList* batchList = (SCliBatchList*)(*pIter);
dengyihao's avatar
dengyihao 已提交
1703 1704 1705
    SCliBatch*     batch = cliGetHeadFromList(batchList);
    if (batch != NULL) {
      cliHandleBatchReq(batch, pThrd);
dengyihao's avatar
dengyihao 已提交
1706
    }
dengyihao's avatar
dengyihao 已提交
1707
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
dengyihao's avatar
dengyihao 已提交
1708 1709
  }

dengyihao's avatar
dengyihao 已提交
1710
  if (count >= 2) {
S
Shengliang Guan 已提交
1711
    tTrace("cli process batch size:%d", count);
dengyihao's avatar
dengyihao 已提交
1712
  }
dengyihao's avatar
dengyihao 已提交
1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725
}

static void cliAsyncCb(uv_async_t* handle) {
  SAsyncItem* item = handle->data;
  SCliThrd*   pThrd = item->pThrd;
  STrans*     pTransInst = pThrd->pTransInst;

  // batch process to avoid to lock/unlock frequently
  queue wq;
  taosThreadMutexLock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  taosThreadMutexUnlock(&item->mtx);

dengyihao's avatar
dengyihao 已提交
1726
  int8_t supportBatch = pTransInst->supportBatch;
dengyihao's avatar
dengyihao 已提交
1727
  if (supportBatch == 0) {
dengyihao's avatar
dengyihao 已提交
1728
    cliNoBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1729
  } else if (supportBatch == 1) {
dengyihao's avatar
dengyihao 已提交
1730
    cliBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1731
  }
dengyihao's avatar
dengyihao 已提交
1732

dengyihao's avatar
dengyihao 已提交
1733
  if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1734
}
dengyihao's avatar
dengyihao 已提交
1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760
static void cliPrepareCb(uv_prepare_t* handle) {
  SCliThrd* thrd = handle->data;
  tTrace("prepare work start");

  SAsyncPool* pool = thrd->asyncPool;
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;

    queue wq;
    taosThreadMutexLock(&item->mtx);
    QUEUE_MOVE(&item->qmsg, &wq);
    taosThreadMutexUnlock(&item->mtx);

    int count = 0;
    while (!QUEUE_IS_EMPTY(&wq)) {
      queue* h = QUEUE_HEAD(&wq);
      QUEUE_REMOVE(h);

      SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
      (*cliAsyncHandle[pMsg->type])(pMsg, thrd);
      count++;
    }
  }
  tTrace("prepare work end");
  if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
dengyihao's avatar
dengyihao 已提交
1761
}
dengyihao's avatar
dengyihao 已提交
1762

dengyihao's avatar
dengyihao 已提交
1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791
void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
  transCtxCleanup(&conn->ctx);
  cliReleaseUnfinishedMsg(conn);
  if (destroy == 1) {
    transQueueDestroy(&conn->cliMsgs);
  } else {
    transQueueClear(&conn->cliMsgs);
  }
}

void cliIteraConnMsgs(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* cmsg = transQueueGet(&conn->cliMsgs, i);
    if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) {
      continue;
    }

    STransMsg resp = {0};
    if (-1 == cliBuildExceptResp(cmsg, &resp)) {
      continue;
    }
    pTransInst->cfp(pTransInst->parent, &resp, NULL);

    cmsg->ctx->ahandle = NULL;
  }
}
dengyihao's avatar
dengyihao 已提交
1792 1793 1794
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
  if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
    uint64_t ahandle = pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
1795
    tDebug("ahandle = %" PRIu64 "", ahandle);
dengyihao's avatar
dengyihao 已提交
1796 1797
    SCliMsg* pMsg = NULL;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
dengyihao's avatar
dengyihao 已提交
1798

dengyihao's avatar
dengyihao 已提交
1799 1800
    transClearBuffer(&conn->readBuf);
    transFreeMsg(transContFromHead((char*)pHead));
dengyihao's avatar
dengyihao 已提交
1801 1802 1803 1804

    for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->cliMsgs); i++) {
      SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i);
      if (cliMsg->type == Release) {
dengyihao's avatar
dengyihao 已提交
1805
        ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req");
dengyihao's avatar
dengyihao 已提交
1806 1807
        return true;
      }
dengyihao's avatar
dengyihao 已提交
1808
    }
dengyihao's avatar
dengyihao 已提交
1809 1810 1811

    cliIteraConnMsgs(conn);

dengyihao's avatar
dengyihao 已提交
1812 1813
    tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1814

dengyihao's avatar
dengyihao 已提交
1815 1816 1817 1818 1819 1820
    addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
    return true;
  }
  return false;
}

U
ubuntu 已提交
1821
static void* cliWorkThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
1822
  SCliThrd* pThrd = (SCliThrd*)arg;
dengyihao's avatar
dengyihao 已提交
1823
  pThrd->pid = taosGetSelfPthreadId();
G
gccgdb1234 已提交
1824
  setThreadName("trans-cli-work");
dengyihao's avatar
dengyihao 已提交
1825
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
dengyihao's avatar
dengyihao 已提交
1826 1827

  tDebug("thread quit-thread:%08" PRId64, pThrd->pid);
1828
  return NULL;
dengyihao's avatar
dengyihao 已提交
1829 1830
}

U
ubuntu 已提交
1831
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
1832
  SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj));
dengyihao's avatar
dengyihao 已提交
1833

dengyihao's avatar
dengyihao 已提交
1834
  STrans* pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
1835
  memcpy(cli->label, label, TSDB_LABEL_LEN);
dengyihao's avatar
dengyihao 已提交
1836
  cli->numOfThreads = numOfThreads;
dengyihao's avatar
dengyihao 已提交
1837
  cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*));
dengyihao's avatar
dengyihao 已提交
1838 1839

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
1840
    SCliThrd* pThrd = createThrdObj(shandle);
dengyihao's avatar
dengyihao 已提交
1841 1842 1843 1844 1845
    if (pThrd == NULL) {
      return NULL;
    }

    int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
1846
    if (err == 0) {
S
Shengliang Guan 已提交
1847
      tDebug("success to create tranport-cli thread:%d", i);
dengyihao's avatar
dengyihao 已提交
1848
    }
dengyihao's avatar
dengyihao 已提交
1849
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
1850
  }
dengyihao's avatar
dengyihao 已提交
1851

dengyihao's avatar
dengyihao 已提交
1852 1853
  return cli;
}
dengyihao's avatar
dengyihao 已提交
1854

dengyihao's avatar
dengyihao 已提交
1855
static FORCE_INLINE void destroyUserdata(STransMsg* userdata) {
dengyihao's avatar
dengyihao 已提交
1856 1857 1858 1859 1860 1861
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
dengyihao's avatar
dengyihao 已提交
1862

dengyihao's avatar
dengyihao 已提交
1863
static FORCE_INLINE void destroyCmsg(void* arg) {
dengyihao's avatar
dengyihao 已提交
1864
  SCliMsg* pMsg = arg;
dengyihao's avatar
dengyihao 已提交
1865 1866 1867
  if (pMsg == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1868

dengyihao's avatar
dengyihao 已提交
1869 1870
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
wafwerar's avatar
wafwerar 已提交
1871
  taosMemoryFree(pMsg);
dengyihao's avatar
dengyihao 已提交
1872
}
dengyihao's avatar
dengyihao 已提交
1873

dengyihao's avatar
dengyihao 已提交
1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
  if (param == NULL) return;

  STaskArg* arg = param;
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;

  tDebug("destroy Ahandle A");
  if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
    tDebug("destroy Ahandle B");
    pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
  }
  tDebug("destroy Ahandle C");

  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
  taosMemoryFree(pMsg);
}

dengyihao's avatar
dengyihao 已提交
1893 1894 1895
static SCliThrd* createThrdObj(void* trans) {
  STrans* pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
1896
  SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd));
U
ubuntu 已提交
1897

dengyihao's avatar
dengyihao 已提交
1898
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
1899
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
1900

wafwerar's avatar
wafwerar 已提交
1901
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
1902 1903 1904 1905 1906 1907 1908 1909
  int err = uv_loop_init(pThrd->loop);
  if (err != 0) {
    tError("failed to init uv_loop, reason:%s", uv_err_name(err));
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1910 1911 1912 1913 1914
  if (pTransInst->supportBatch) {
    pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 4, pThrd, cliAsyncCb);
  } else {
    pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb);
  }
dengyihao's avatar
dengyihao 已提交
1915
  if (pThrd->asyncPool == NULL) {
dengyihao's avatar
ref log  
dengyihao 已提交
1916
    tError("failed to init async pool");
dengyihao's avatar
dengyihao 已提交
1917 1918 1919 1920 1921 1922
    uv_loop_close(pThrd->loop);
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1923 1924 1925 1926

  pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
  uv_prepare_init(pThrd->loop, pThrd->prepare);
  pThrd->prepare->data = pThrd;
dengyihao's avatar
dengyihao 已提交
1927
  // uv_prepare_start(pThrd->prepare, cliPrepareCb);
dengyihao's avatar
dengyihao 已提交
1928

dengyihao's avatar
dengyihao 已提交
1929
  int32_t timerSize = 64;
dengyihao's avatar
dengyihao 已提交
1930 1931 1932 1933 1934 1935 1936
  pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
  for (int i = 0; i < timerSize; i++) {
    uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    uv_timer_init(pThrd->loop, timer);
    taosArrayPush(pThrd->timerList, &timer);
  }

dengyihao's avatar
dengyihao 已提交
1937
  pThrd->pool = createConnPool(4);
dengyihao's avatar
dengyihao 已提交
1938
  transDQCreate(pThrd->loop, &pThrd->delayQueue);
dengyihao's avatar
dengyihao 已提交
1939

dengyihao's avatar
dengyihao 已提交
1940 1941
  transDQCreate(pThrd->loop, &pThrd->timeoutQueue);

dengyihao's avatar
dengyihao 已提交
1942 1943
  transDQCreate(pThrd->loop, &pThrd->waitConnQueue);

dengyihao's avatar
dengyihao 已提交
1944 1945 1946
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
  pThrd->pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
1947
  pThrd->destroyAhandleFp = pTransInst->destroyFp;
dengyihao's avatar
dengyihao 已提交
1948
  pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1949 1950
  pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);

dengyihao's avatar
dengyihao 已提交
1951
  pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1952

dengyihao's avatar
dengyihao 已提交
1953
  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
1954 1955
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
1956
static void destroyThrdObj(SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1957 1958 1959
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1960

wafwerar's avatar
wafwerar 已提交
1961
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
1962
  CLI_RELEASE_UV(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
1963
  taosThreadMutexDestroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
1964
  TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
dengyihao's avatar
dengyihao 已提交
1965
  transAsyncPoolDestroy(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
1966

dengyihao's avatar
dengyihao 已提交
1967
  transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
dengyihao's avatar
dengyihao 已提交
1968
  transDQDestroy(pThrd->timeoutQueue, NULL);
dengyihao's avatar
dengyihao 已提交
1969
  transDQDestroy(pThrd->waitConnQueue, NULL);
dengyihao's avatar
dengyihao 已提交
1970

dengyihao's avatar
dengyihao 已提交
1971
  tDebug("thread destroy %" PRId64, pThrd->pid);
dengyihao's avatar
dengyihao 已提交
1972 1973 1974 1975 1976
  for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
    uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
    taosMemoryFree(timer);
  }
  taosArrayDestroy(pThrd->timerList);
dengyihao's avatar
dengyihao 已提交
1977
  taosMemoryFree(pThrd->prepare);
wafwerar's avatar
wafwerar 已提交
1978
  taosMemoryFree(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
1979
  taosHashCleanup(pThrd->fqdn2ipCache);
dengyihao's avatar
dengyihao 已提交
1980
  taosHashCleanup(pThrd->failFastCache);
dengyihao's avatar
dengyihao 已提交
1981 1982 1983

  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
  while (pIter != NULL) {
dengyihao's avatar
dengyihao 已提交
1984 1985 1986 1987 1988 1989 1990 1991
    SCliBatchList* pBatchList = (SCliBatchList*)(*pIter);
    while (!QUEUE_IS_EMPTY(&pBatchList->wq)) {
      queue* h = QUEUE_HEAD(&pBatchList->wq);
      QUEUE_REMOVE(h);

      SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
      cliDestroyBatch(pBatch);
    }
dengyihao's avatar
dengyihao 已提交
1992 1993
    taosMemoryFree(pBatchList->ip);
    taosMemoryFree(pBatchList->dst);
dengyihao's avatar
dengyihao 已提交
1994
    taosMemoryFree(pBatchList);
dengyihao's avatar
dengyihao 已提交
1995

dengyihao's avatar
dengyihao 已提交
1996 1997
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
  }
dengyihao's avatar
dengyihao 已提交
1998
  taosHashCleanup(pThrd->batchCache);
wafwerar's avatar
wafwerar 已提交
1999
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
2000
}
dengyihao's avatar
dengyihao 已提交
2001

dengyihao's avatar
dengyihao 已提交
2002
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
2003
  //
wafwerar's avatar
wafwerar 已提交
2004
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
2005
}
dengyihao's avatar
dengyihao 已提交
2006

dengyihao's avatar
dengyihao 已提交
2007
void cliSendQuit(SCliThrd* thrd) {
dengyihao's avatar
dengyihao 已提交
2008
  // cli can stop gracefully
wafwerar's avatar
wafwerar 已提交
2009
  SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2010
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
2011
  transAsyncSend(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
2012
  atomic_store_8(&thrd->asyncPool->stop, 1);
dengyihao's avatar
dengyihao 已提交
2013
}
dengyihao's avatar
dengyihao 已提交
2014 2015
void cliWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
dengyihao's avatar
dengyihao 已提交
2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027
    if (uv_handle_get_type(handle) == UV_TIMER) {
      // SCliConn* pConn = handle->data;
      //  if (pConn != NULL && pConn->timer != NULL) {
      //    SCliThrd* pThrd = pConn->hostThrd;
      //    uv_timer_stop((uv_timer_t*)handle);
      //    handle->data = NULL;
      //    taosArrayPush(pThrd->timerList, &pConn->timer);
      //    pConn->timer = NULL;
      //  }
    } else {
      uv_read_stop((uv_stream_t*)handle);
    }
dengyihao's avatar
dengyihao 已提交
2028
    uv_close(handle, cliDestroy);
dengyihao's avatar
dengyihao 已提交
2029 2030
  }
}
dengyihao's avatar
dengyihao 已提交
2031

dengyihao's avatar
dengyihao 已提交
2032
FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
dengyihao's avatar
dengyihao 已提交
2033
  int32_t index = pTransInst->index;
dengyihao's avatar
dengyihao 已提交
2034 2035 2036
  if (pTransInst->numOfThreads == 0) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2037 2038 2039 2040
  /*
   * no lock, and to avoid CPU load imbalance, set limit pTransInst->numOfThreads * 2000;
   */
  if (pTransInst->index++ >= pTransInst->numOfThreads * 2000) {
U
ubuntu 已提交
2041 2042 2043 2044
    pTransInst->index = 0;
  }
  return index % pTransInst->numOfThreads;
}
dengyihao's avatar
dengyihao 已提交
2045
static FORCE_INLINE void doDelayTask(void* param) {
dengyihao's avatar
dengyihao 已提交
2046
  STaskArg* arg = param;
dengyihao's avatar
dengyihao 已提交
2047
  cliHandleReq((SCliMsg*)arg->param1, (SCliThrd*)arg->param2);
dengyihao's avatar
dengyihao 已提交
2048 2049
  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
2050

dengyihao's avatar
dengyihao 已提交
2051 2052 2053
static void doCloseIdleConn(void* param) {
  STaskArg* arg = param;
  SCliConn* conn = arg->param1;
dengyihao's avatar
dengyihao 已提交
2054
  tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
2055
  conn->task = NULL;
dengyihao's avatar
dengyihao 已提交
2056 2057
  cliDestroyConn(conn, true);
  taosMemoryFree(arg);
dengyihao's avatar
dengyihao 已提交
2058 2059
}

dengyihao's avatar
dengyihao 已提交
2060
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
2061
  STrans*        pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2062 2063
  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
2064 2065
  STraceId* trace = &pMsg->msg.info.traceId;
  char      tbuf[256] = {0};
dengyihao's avatar
dengyihao 已提交
2066
  EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
dengyihao's avatar
dengyihao 已提交
2067 2068
  tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
          pCtx->retryStep, pCtx->retryNextInterval);
dengyihao's avatar
dengyihao 已提交
2069 2070 2071 2072

  STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
  arg->param1 = pMsg;
  arg->param2 = pThrd;
dengyihao's avatar
dengyihao 已提交
2073 2074

  transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval);
dengyihao's avatar
dengyihao 已提交
2075
}
dengyihao's avatar
dengyihao 已提交
2076

dengyihao's avatar
dengyihao 已提交
2077
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
dengyihao's avatar
dengyihao 已提交
2078 2079 2080 2081
  if (*val != exp) {
    *val = newVal;
  }
}
dengyihao's avatar
dengyihao 已提交
2082

dengyihao's avatar
dengyihao 已提交
2083
FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
dengyihao's avatar
dengyihao 已提交
2084 2085 2086 2087 2088 2089
  if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
    return false;
  }
  // rebuild resp msg
  SEpSet epset;
  if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) {
dengyihao's avatar
dengyihao 已提交
2090 2091 2092 2093
    return false;
  }
  int32_t tlen = tSerializeSEpSet(NULL, 0, dst);

dengyihao's avatar
dengyihao 已提交
2094 2095 2096 2097 2098 2099 2100
  char*   buf = NULL;
  int32_t len = pResp->contLen - tlen;
  if (len != 0) {
    buf = rpcMallocCont(len);
    memcpy(buf, (char*)pResp->pCont + tlen, len);
  }
  rpcFreeCont(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2101 2102

  pResp->pCont = buf;
dengyihao's avatar
dengyihao 已提交
2103 2104 2105
  pResp->contLen = len;

  *dst = epset;
dengyihao's avatar
dengyihao 已提交
2106 2107
  return true;
}
dengyihao's avatar
dengyihao 已提交
2108
bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
dengyihao's avatar
dengyihao 已提交
2109
  bool noDelay = true;
dengyihao's avatar
dengyihao 已提交
2110 2111 2112 2113 2114 2115 2116
  if (hasEpSet == false) {
    if (pResp->contLen == 0) {
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2117 2118 2119 2120 2121 2122 2123 2124 2125 2126
    } else if (pResp->contLen != 0) {
      SEpSet  epSet;
      int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
      if (valid < 0) {
        tDebug("get invalid epset, epset equal, continue");
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
2127
      } else {
dengyihao's avatar
dengyihao 已提交
2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139
        if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
          tDebug("epset not equal, retry new epset");
          pCtx->epSet = epSet;
          noDelay = false;
        } else {
          if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
            noDelay = false;
          } else {
            tDebug("epset equal, continue");
            EPSET_FORWARD_INUSE(&pCtx->epSet);
          }
        }
dengyihao's avatar
dengyihao 已提交
2140
      }
dengyihao's avatar
dengyihao 已提交
2141 2142
    }
  } else {
dengyihao's avatar
dengyihao 已提交
2143
    SEpSet  epSet;
dengyihao's avatar
dengyihao 已提交
2144 2145
    int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
    if (valid < 0) {
dengyihao's avatar
dengyihao 已提交
2146 2147 2148 2149 2150 2151
      tDebug("get invalid epset, epset equal, continue");
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2152
    } else {
dengyihao's avatar
dengyihao 已提交
2153 2154 2155
      if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
        tDebug("epset not equal, retry new epset");
        pCtx->epSet = epSet;
dengyihao's avatar
dengyihao 已提交
2156
        noDelay = false;
dengyihao's avatar
dengyihao 已提交
2157
      } else {
dengyihao's avatar
dengyihao 已提交
2158 2159 2160 2161 2162 2163
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          tDebug("epset equal, continue");
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
2164
      }
dengyihao's avatar
dengyihao 已提交
2165 2166 2167 2168 2169
    }
  }
  return noDelay;
}
bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
2170 2171
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2172

dengyihao's avatar
dengyihao 已提交
2173 2174
  STransConnCtx* pCtx = pMsg->ctx;
  int32_t        code = pResp->code;
dengyihao's avatar
dengyihao 已提交
2175

dengyihao's avatar
dengyihao 已提交
2176
  bool retry = pTransInst->retry != NULL ? pTransInst->retry(code, pResp->msgType - 1) : false;
dengyihao's avatar
dengyihao 已提交
2177 2178 2179
  if (retry == false) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
2180 2181 2182 2183 2184 2185 2186 2187

  if (!pCtx->retryInit) {
    pCtx->retryMinInterval = pTransInst->retryMinInterval;
    pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
    pCtx->retryStepFactor = pTransInst->retryStepFactor;
    pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet;
    pCtx->retryInitTimestamp = taosGetTimestampMs();
    pCtx->retryNextInterval = pCtx->retryMinInterval;
dengyihao's avatar
dengyihao 已提交
2188
    pCtx->retryStep = 0;
dengyihao's avatar
dengyihao 已提交
2189
    pCtx->retryInit = true;
dengyihao's avatar
dengyihao 已提交
2190
    pCtx->retryCode = TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
2191 2192 2193

    // already retry, not use handle specified by app;
    pMsg->msg.info.handle = 0;
dengyihao's avatar
dengyihao 已提交
2194
  }
dengyihao's avatar
dengyihao 已提交
2195

dengyihao's avatar
dengyihao 已提交
2196 2197
  if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
    return false;
dengyihao's avatar
dengyihao 已提交
2198
  }
dengyihao's avatar
dengyihao 已提交
2199

dengyihao's avatar
dengyihao 已提交
2200 2201 2202 2203 2204 2205
  // code, msgType

  // A:  epset,   leader, not self
  // B:  epset,   not know leader
  // C:  no epset, leader but not serivce

dengyihao's avatar
dengyihao 已提交
2206 2207
  bool noDelay = false;
  if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
dengyihao's avatar
dengyihao 已提交
2208
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2209
    noDelay = cliResetEpset(pCtx, pResp, false);
dengyihao's avatar
dengyihao 已提交
2210 2211
    transFreeMsg(pResp->pCont);
    transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
2212
  } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
2213 2214
             code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED ||
             code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
dengyihao's avatar
dengyihao 已提交
2215
             code == TSDB_CODE_APP_IS_STOPPING || code == TSDB_CODE_VND_STOPPED) {
dengyihao's avatar
dengyihao 已提交
2216
    tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2217
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
2218 2219
    transFreeMsg(pResp->pCont);
    addConnToPool(pThrd->pool, pConn);
dengyihao's avatar
dengyihao 已提交
2220
  } else if (code == TSDB_CODE_SYN_RESTORING) {
dengyihao's avatar
dengyihao 已提交
2221
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2222
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
2223 2224
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2225
  } else {
dengyihao's avatar
dengyihao 已提交
2226
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2227 2228 2229
    noDelay = cliResetEpset(pCtx, pResp, false);
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2230
  }
dengyihao's avatar
dengyihao 已提交
2231 2232 2233 2234
  if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) {
    // save one internal code
    pCtx->retryCode = code;
  }
dengyihao's avatar
dengyihao 已提交
2235

dengyihao's avatar
dengyihao 已提交
2236 2237 2238
  if (noDelay == false) {
    pCtx->epsetRetryCnt = 1;
    pCtx->retryStep++;
dengyihao's avatar
dengyihao 已提交
2239

dengyihao's avatar
dengyihao 已提交
2240 2241 2242 2243 2244 2245
    int64_t factor = pow(pCtx->retryStepFactor, pCtx->retryStep - 1);
    pCtx->retryNextInterval = factor * pCtx->retryMinInterval;
    if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
      pCtx->retryNextInterval = pCtx->retryMaxInterval;
    }

dengyihao's avatar
dengyihao 已提交
2246 2247 2248
    // if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
    //   return false;
    // }
dengyihao's avatar
dengyihao 已提交
2249 2250 2251
  } else {
    pCtx->retryNextInterval = 0;
    pCtx->epsetRetryCnt++;
dengyihao's avatar
dengyihao 已提交
2252
  }
dengyihao's avatar
dengyihao 已提交
2253

dengyihao's avatar
dengyihao 已提交
2254
  pMsg->sent = 0;
dengyihao's avatar
dengyihao 已提交
2255
  cliSchedMsgToNextNode(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
2256
  return true;
dengyihao's avatar
dengyihao 已提交
2257
}
dengyihao's avatar
dengyihao 已提交
2258
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
2259 2260
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2261

dengyihao's avatar
dengyihao 已提交
2262
  if (pMsg == NULL || pMsg->ctx == NULL) {
dengyihao's avatar
dengyihao 已提交
2263
    tTrace("%s conn %p handle resp", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
2264 2265 2266
    pTransInst->cfp(pTransInst->parent, pResp, NULL);
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
2267

dengyihao's avatar
dengyihao 已提交
2268
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
2269

dengyihao's avatar
dengyihao 已提交
2270 2271 2272
  bool retry = cliGenRetryRule(pConn, pResp, pMsg);
  if (retry == true) {
    return -1;
dengyihao's avatar
dengyihao 已提交
2273
  }
dengyihao's avatar
dengyihao 已提交
2274

dengyihao's avatar
dengyihao 已提交
2275 2276 2277
  if (pCtx->retryCode != TSDB_CODE_SUCCESS) {
    int32_t code = pResp->code;
    // return internal code app
dengyihao's avatar
dengyihao 已提交
2278 2279
    if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK ||
        code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
dengyihao's avatar
dengyihao 已提交
2280 2281 2282 2283
      pResp->code = pCtx->retryCode;
    }
  }

2284
  // check whole vnodes is offline on this vgroup
2285 2286
  if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps || pCtx->retryStep > 0) {
    if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
A
Alex Duan 已提交
2287
      pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
2288
    } else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) {
A
Alex Duan 已提交
2289
      pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK;
2290 2291 2292
    }
  }

dengyihao's avatar
dengyihao 已提交
2293 2294
  STraceId* trace = &pResp->info.traceId;
  bool      hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2295
  if (hasEpSet) {
dengyihao's avatar
dengyihao 已提交
2296 2297
    char tbuf[256] = {0};
    EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
dengyihao's avatar
dengyihao 已提交
2298
    tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
2299
  }
dengyihao's avatar
dengyihao 已提交
2300

dengyihao's avatar
dengyihao 已提交
2301
  if (pCtx->pSem != NULL) {
dengyihao's avatar
dengyihao 已提交
2302
    tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
2303
    if (pCtx->pRsp == NULL) {
dengyihao's avatar
dengyihao 已提交
2304
      tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
2305 2306 2307
    } else {
      memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
    }
dengyihao's avatar
dengyihao 已提交
2308
    tsem_post(pCtx->pSem);
2309
    pCtx->pRsp = NULL;
dengyihao's avatar
dengyihao 已提交
2310
  } else {
dengyihao's avatar
dengyihao 已提交
2311
    tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
2312
    if (retry == false && hasEpSet == true) {
dengyihao's avatar
dengyihao 已提交
2313
      pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2314
    } else {
dengyihao's avatar
dengyihao 已提交
2315
      if (!cliIsEpsetUpdated(pResp->code, pCtx)) {
dengyihao's avatar
dengyihao 已提交
2316 2317 2318 2319
        pTransInst->cfp(pTransInst->parent, pResp, NULL);
      } else {
        pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2320
    }
dengyihao's avatar
dengyihao 已提交
2321
  }
dengyihao's avatar
dengyihao 已提交
2322
  return 0;
dengyihao's avatar
dengyihao 已提交
2323
}
U
ubuntu 已提交
2324 2325

void transCloseClient(void* arg) {
U
ubuntu 已提交
2326
  SCliObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
2327
  for (int i = 0; i < cli->numOfThreads; i++) {
U
ubuntu 已提交
2328
    cliSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2329
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2330
  }
wafwerar's avatar
wafwerar 已提交
2331 2332
  taosMemoryFree(cli->pThreadObj);
  taosMemoryFree(cli);
dengyihao's avatar
dengyihao 已提交
2333
}
dengyihao's avatar
dengyihao 已提交
2334 2335 2336 2337 2338
void transRefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2339
  tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2340 2341 2342 2343 2344 2345 2346
  UNUSED(ref);
}
void transUnrefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2347
  tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2348
  if (ref == 0) {
U
ubuntu 已提交
2349
    cliDestroyConn((SCliConn*)handle, true);
dengyihao's avatar
dengyihao 已提交
2350 2351
  }
}
dengyihao's avatar
dengyihao 已提交
2352
static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2353
  SCliThrd*  pThrd = NULL;
dengyihao's avatar
dengyihao 已提交
2354
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2355 2356 2357
  if (exh == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
2358

dengyihao's avatar
dengyihao 已提交
2359 2360 2361 2362 2363 2364
  if (exh->pThrd == NULL && trans != NULL) {
    int idx = cliRBChoseIdx(trans);
    if (idx < 0) return NULL;
    exh->pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }

dengyihao's avatar
dengyihao 已提交
2365
  pThrd = exh->pThrd;
dengyihao's avatar
dengyihao 已提交
2366
  transReleaseExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2367 2368
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
2369
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2370
  if (handle == 0) {
dengyihao's avatar
dengyihao 已提交
2371
    int idx = cliRBChoseIdx(trans);
dengyihao's avatar
dengyihao 已提交
2372
    if (idx < 0) return NULL;
dengyihao's avatar
dengyihao 已提交
2373 2374
    return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }
dengyihao's avatar
dengyihao 已提交
2375
  SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle);
D
dapan1121 已提交
2376
  return pThrd;
dengyihao's avatar
dengyihao 已提交
2377
}
dengyihao's avatar
dengyihao 已提交
2378
int transReleaseCliHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
2379 2380 2381
  int  idx = -1;
  bool valid = false;

dengyihao's avatar
dengyihao 已提交
2382
  SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
dengyihao's avatar
dengyihao 已提交
2383
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2384
    return -1;
dengyihao's avatar
dengyihao 已提交
2385
  }
dengyihao's avatar
dengyihao 已提交
2386

dengyihao's avatar
dengyihao 已提交
2387
  STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527};
dengyihao's avatar
rm code  
dengyihao 已提交
2388
  TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
dengyihao's avatar
dengyihao 已提交
2389

dengyihao's avatar
dengyihao 已提交
2390 2391 2392
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
  pCtx->ahandle = tmsg.info.ahandle;

dengyihao's avatar
dengyihao 已提交
2393
  SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2394
  cmsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
2395
  cmsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2396
  cmsg->type = Release;
dengyihao's avatar
dengyihao 已提交
2397
  cmsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2398

dengyihao's avatar
dengyihao 已提交
2399 2400 2401
  STraceId* trace = &tmsg.info.traceId;
  tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);

dengyihao's avatar
dengyihao 已提交
2402
  if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
dengyihao's avatar
dengyihao 已提交
2403
    destroyCmsg(cmsg);
dengyihao's avatar
dengyihao 已提交
2404 2405
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2406
  return 0;
dengyihao's avatar
dengyihao 已提交
2407
}
dengyihao's avatar
dengyihao 已提交
2408

dengyihao's avatar
dengyihao 已提交
2409
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
2410 2411 2412 2413 2414
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2415

dengyihao's avatar
dengyihao 已提交
2416
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
dengyihao's avatar
dengyihao 已提交
2417
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2418
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2419
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2420
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2421 2422
  }

dengyihao's avatar
dengyihao 已提交
2423
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
wafwerar's avatar
wafwerar 已提交
2424
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2425
  pCtx->epSet = *pEpSet;
dengyihao's avatar
dengyihao 已提交
2426
  pCtx->origEpSet = *pEpSet;
S
Shengliang Guan 已提交
2427
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2428
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2429

H
Haojun Liao 已提交
2430
  if (ctx != NULL) pCtx->appCtx = *ctx;
dengyihao's avatar
dengyihao 已提交
2431

wafwerar's avatar
wafwerar 已提交
2432
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2433
  cliMsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2434
  cliMsg->msg = *pReq;
dengyihao's avatar
dengyihao 已提交
2435
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2436
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2437
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2438

dengyihao's avatar
dengyihao 已提交
2439
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2440 2441
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2442 2443
  if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2444
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2445 2446
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2447
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2448
  return 0;
dengyihao's avatar
dengyihao 已提交
2449
}
dengyihao's avatar
dengyihao 已提交
2450

dengyihao's avatar
dengyihao 已提交
2451
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
dengyihao's avatar
dengyihao 已提交
2452 2453 2454 2455 2456
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2457

dengyihao's avatar
dengyihao 已提交
2458 2459
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2460
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2461
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2462
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2463
  }
dengyihao's avatar
dengyihao 已提交
2464

dengyihao's avatar
dengyihao 已提交
2465 2466
  tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
  tsem_init(sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
2467

dengyihao's avatar
dengyihao 已提交
2468 2469
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());

wafwerar's avatar
wafwerar 已提交
2470
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2471
  pCtx->epSet = *pEpSet;
dengyihao's avatar
dengyihao 已提交
2472
  pCtx->origEpSet = *pEpSet;
S
Shengliang Guan 已提交
2473
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2474
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2475
  pCtx->pSem = sem;
dengyihao's avatar
dengyihao 已提交
2476 2477
  pCtx->pRsp = pRsp;

wafwerar's avatar
wafwerar 已提交
2478
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2479 2480 2481
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2482
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2483
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2484

dengyihao's avatar
dengyihao 已提交
2485
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2486 2487
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2488

dengyihao's avatar
dengyihao 已提交
2489 2490
  int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
  if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
2491
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2492
    goto _RETURN;
dengyihao's avatar
dengyihao 已提交
2493
  }
dengyihao's avatar
dengyihao 已提交
2494
  tsem_wait(sem);
dengyihao's avatar
dengyihao 已提交
2495 2496

_RETURN:
dengyihao's avatar
dengyihao 已提交
2497 2498
  tsem_destroy(sem);
  taosMemoryFree(sem);
dengyihao's avatar
dengyihao 已提交
2499
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2500
  return ret;
dengyihao's avatar
dengyihao 已提交
2501
}
dengyihao's avatar
dengyihao 已提交
2502 2503 2504
/*
 *
 **/
dengyihao's avatar
dengyihao 已提交
2505
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
dengyihao's avatar
dengyihao 已提交
2506
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2507 2508 2509
  if (pTransInst == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2510 2511 2512

  SCvtAddr cvtAddr = {0};
  if (ip != NULL && fqdn != NULL) {
dengyihao's avatar
dengyihao 已提交
2513 2514
    tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
    tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn));
dengyihao's avatar
dengyihao 已提交
2515 2516
    cvtAddr.cvt = true;
  }
dengyihao's avatar
dengyihao 已提交
2517 2518
  for (int i = 0; i < pTransInst->numOfThreads; i++) {
    STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2519
    pCtx->cvtAddr = cvtAddr;
dengyihao's avatar
dengyihao 已提交
2520 2521 2522 2523

    SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
    cliMsg->ctx = pCtx;
    cliMsg->type = Update;
dengyihao's avatar
dengyihao 已提交
2524
    cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2525

dengyihao's avatar
dengyihao 已提交
2526
    SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
S
Shengliang Guan 已提交
2527
    tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid);
dengyihao's avatar
dengyihao 已提交
2528

dengyihao's avatar
dengyihao 已提交
2529 2530
    if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) {
      destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2531
      transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2532 2533
      return -1;
    }
dengyihao's avatar
dengyihao 已提交
2534
  }
dengyihao's avatar
dengyihao 已提交
2535
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2536
  return 0;
dengyihao's avatar
dengyihao 已提交
2537
}
dengyihao's avatar
dengyihao 已提交
2538 2539 2540 2541 2542

int64_t transAllocHandle() {
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
  tDebug("pre alloc refId %" PRId64 "", exh->refId);
dengyihao's avatar
dengyihao 已提交
2543

dengyihao's avatar
dengyihao 已提交
2544 2545
  return exh->refId;
}