transCli.c 80.2 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
dengyihao's avatar
dengyihao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#include "transComm.h"

dengyihao's avatar
dengyihao 已提交
16 17 18 19 20
typedef struct {
  int32_t numOfConn;
  queue   msgQ;
} SMsgList;

dengyihao's avatar
dengyihao 已提交
21
typedef struct SConnList {
dengyihao's avatar
dengyihao 已提交
22 23 24
  queue     conns;
  int32_t   size;
  SMsgList* list;
dengyihao's avatar
dengyihao 已提交
25 26
} SConnList;

dengyihao's avatar
dengyihao 已提交
27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28 29 30 31 32 33
  queue   wq;
  int32_t len;

  int connMax;
  int connCnt;
  int batchLenLimit;
dengyihao's avatar
dengyihao 已提交
34
  int sending;
dengyihao's avatar
dengyihao 已提交
35

dengyihao's avatar
dengyihao 已提交
36 37 38
  char*    dst;
  char*    ip;
  uint16_t port;
dengyihao's avatar
dengyihao 已提交
39 40 41 42 43 44 45 46 47 48

} SCliBatchList;

typedef struct {
  queue          wq;
  queue          listq;
  int32_t        wLen;
  int32_t        batchSize;  //
  int32_t        batch;
  SCliBatchList* pList;
dengyihao's avatar
dengyihao 已提交
49
} SCliBatch;
dengyihao's avatar
dengyihao 已提交
50

dengyihao's avatar
dengyihao 已提交
51
typedef struct SCliConn {
dengyihao's avatar
dengyihao 已提交
52
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
53 54
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
55
  queue        wreqQueue;
dengyihao's avatar
dengyihao 已提交
56 57

  uv_timer_t* timer;  // read timer, forbidden
dengyihao's avatar
dengyihao 已提交
58

dengyihao's avatar
dengyihao 已提交
59 60 61 62
  void* hostThrd;

  SConnBuffer readBuf;
  STransQueue cliMsgs;
dengyihao's avatar
dengyihao 已提交
63 64 65

  queue      q;
  SConnList* list;
dengyihao's avatar
dengyihao 已提交
66 67

  STransCtx  ctx;
dengyihao's avatar
dengyihao 已提交
68 69
  bool       broken;  // link broken or not
  ConnStatus status;  //
dengyihao's avatar
dengyihao 已提交
70

dengyihao's avatar
dengyihao 已提交
71 72
  SCliBatch* pBatch;

dengyihao's avatar
dengyihao 已提交
73 74
  int64_t refId;
  char*   ip;
dengyihao's avatar
dengyihao 已提交
75

dengyihao's avatar
dengyihao 已提交
76
  SDelayTask* task;
dengyihao's avatar
dengyihao 已提交
77

dengyihao's avatar
dengyihao 已提交
78
  // debug and log info
dengyihao's avatar
dengyihao 已提交
79 80 81
  char src[32];
  char dst[32];

dengyihao's avatar
dengyihao 已提交
82
} SCliConn;
dengyihao's avatar
dengyihao 已提交
83

dengyihao's avatar
dengyihao 已提交
84
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
85
  STransConnCtx* ctx;
dengyihao's avatar
formate  
dengyihao 已提交
86
  STransMsg      msg;
dengyihao's avatar
dengyihao 已提交
87
  queue          q;
dengyihao's avatar
dengyihao 已提交
88
  STransMsgType  type;
dengyihao's avatar
dengyihao 已提交
89

dengyihao's avatar
dengyihao 已提交
90
  int64_t  refId;
dengyihao's avatar
dengyihao 已提交
91 92
  uint64_t st;
  int      sent;  //(0: no send, 1: alread sent)
dengyihao's avatar
dengyihao 已提交
93 94
} SCliMsg;

dengyihao's avatar
dengyihao 已提交
95
typedef struct SCliThrd {
dengyihao's avatar
dengyihao 已提交
96 97 98 99 100 101
  TdThread      thread;  // tid
  int64_t       pid;     // pid
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  uv_prepare_t* prepare;
  void*         pool;  // conn pool
dengyihao's avatar
dengyihao 已提交
102
  // timer handles
dengyihao's avatar
dengyihao 已提交
103
  SArray* timerList;
dengyihao's avatar
dengyihao 已提交
104
  // msg queue
dengyihao's avatar
dengyihao 已提交
105
  queue         msg;
wafwerar's avatar
wafwerar 已提交
106
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
107
  SDelayQueue*  delayQueue;
dengyihao's avatar
dengyihao 已提交
108
  SDelayQueue*  timeoutQueue;
dengyihao's avatar
dengyihao 已提交
109
  SDelayQueue*  waitConnQueue;
dengyihao's avatar
dengyihao 已提交
110 111
  uint64_t      nextTimeout;  // next timeout
  void*         pTransInst;   //
dengyihao's avatar
dengyihao 已提交
112

dengyihao's avatar
dengyihao 已提交
113
  int connCount;
dengyihao's avatar
dengyihao 已提交
114
  void (*destroyAhandleFp)(void* ahandle);
dengyihao's avatar
dengyihao 已提交
115 116
  SHashObj* fqdn2ipCache;
  SCvtAddr  cvtAddr;
dengyihao's avatar
dengyihao 已提交
117

dengyihao's avatar
dengyihao 已提交
118
  SHashObj* failFastCache;
dengyihao's avatar
dengyihao 已提交
119
  SHashObj* batchCache;
dengyihao's avatar
dengyihao 已提交
120

dengyihao's avatar
dengyihao 已提交
121 122
  SCliMsg* stopMsg;

dengyihao's avatar
dengyihao 已提交
123
  bool quit;
dengyihao's avatar
dengyihao 已提交
124
} SCliThrd;
dengyihao's avatar
dengyihao 已提交
125

U
ubuntu 已提交
126
typedef struct SCliObj {
dengyihao's avatar
dengyihao 已提交
127 128 129 130
  char       label[TSDB_LABEL_LEN];
  int32_t    index;
  int        numOfThreads;
  SCliThrd** pThreadObj;
U
ubuntu 已提交
131
} SCliObj;
dengyihao's avatar
dengyihao 已提交
132

dengyihao's avatar
dengyihao 已提交
133 134 135 136 137 138 139
typedef struct {
  int32_t reinit;
  int64_t timestamp;
  int32_t count;
  int32_t threshold;
  int64_t interval;
} SFailFastItem;
dengyihao's avatar
dengyihao 已提交
140
// conn pool
dengyihao's avatar
dengyihao 已提交
141
// add expire timeout and capacity limit
dengyihao's avatar
dengyihao 已提交
142
static void*     createConnPool(int size);
dengyihao's avatar
dengyihao 已提交
143
static void*     destroyConnPool(SCliThrd* thread);
dengyihao's avatar
dengyihao 已提交
144
static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed);
dengyihao's avatar
dengyihao 已提交
145
static void      addConnToPool(void* pool, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
146
static void      doCloseIdleConn(void* param);
dengyihao's avatar
dengyihao 已提交
147

dengyihao's avatar
dengyihao 已提交
148 149
// register conn timer
static void cliConnTimeout(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
150 151
// register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
152
// register timer in each thread to clear expire conn
dengyihao's avatar
dengyihao 已提交
153
// static void cliTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
154
// alloc buffer for recv
dengyihao's avatar
dengyihao 已提交
155
static FORCE_INLINE void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
156
// callback after recv nbytes from socket
U
ubuntu 已提交
157
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
158
// callback after send data to socket
U
ubuntu 已提交
159
static void cliSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
160
// callback after conn to server
U
ubuntu 已提交
161 162
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
163
static void cliIdleCb(uv_idle_t* handle);
dengyihao's avatar
dengyihao 已提交
164
static void cliPrepareCb(uv_prepare_t* handle);
dengyihao's avatar
dengyihao 已提交
165

dengyihao's avatar
dengyihao 已提交
166
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
167
static void cliSendBatchCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
168 169

SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
dengyihao's avatar
dengyihao 已提交
170

dengyihao's avatar
dengyihao 已提交
171 172
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);

dengyihao's avatar
dengyihao 已提交
173
static int32_t allocConnRef(SCliConn* conn, bool update);
dengyihao's avatar
dengyihao 已提交
174

dengyihao's avatar
dengyihao 已提交
175
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
dengyihao's avatar
dengyihao 已提交
176

dengyihao's avatar
dengyihao 已提交
177
static SCliConn* cliCreateConn(SCliThrd* thrd);
U
ubuntu 已提交
178 179
static void      cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void      cliDestroy(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
180
static void      cliSend(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
181
static void      cliSendBatch(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
182
static void      cliDestroyConnMsgs(SCliConn* conn, bool destroy);
dengyihao's avatar
dengyihao 已提交
183

dengyihao's avatar
dengyihao 已提交
184 185
static void    doFreeTimeoutMsg(void* param);
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg);
dengyihao's avatar
dengyihao 已提交
186

dengyihao's avatar
dengyihao 已提交
187
// cli util func
dengyihao's avatar
dengyihao 已提交
188 189
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
dengyihao's avatar
dengyihao 已提交
190

dengyihao's avatar
dengyihao 已提交
191
static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
dengyihao's avatar
dengyihao 已提交
192

dengyihao's avatar
dengyihao 已提交
193 194 195
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn);
static FORCE_INLINE void     cliUpdateFqdnCache(SHashObj* cache, char* fqdn);

dengyihao's avatar
dengyihao 已提交
196
// process data read from server, add decompress etc later
U
ubuntu 已提交
197
static void cliHandleResp(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
198
// handle except about conn
U
ubuntu 已提交
199
static void cliHandleExcept(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
200
static void cliReleaseUnfinishedMsg(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
201
static void cliHandleFastFail(SCliConn* pConn, int status);
dengyihao's avatar
dengyihao 已提交
202

dengyihao's avatar
dengyihao 已提交
203
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd);
204
// handle req from app
dengyihao's avatar
dengyihao 已提交
205 206 207 208 209 210
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
                                                                   cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
211 212
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
/// NULL,cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
213

dengyihao's avatar
dengyihao 已提交
214 215
static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
static FORCE_INLINE void destroyCmsg(void* cmsg);
dengyihao's avatar
dengyihao 已提交
216
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
dengyihao's avatar
dengyihao 已提交
217
static FORCE_INLINE int  cliRBChoseIdx(STrans* pTransInst);
dengyihao's avatar
dengyihao 已提交
218
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
219

dengyihao's avatar
dengyihao 已提交
220
// thread obj
dengyihao's avatar
dengyihao 已提交
221
static SCliThrd* createThrdObj(void* trans);
dengyihao's avatar
dengyihao 已提交
222
static void      destroyThrdObj(SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
223

dengyihao's avatar
dengyihao 已提交
224 225 226 227 228 229 230 231 232
static void cliWalkCb(uv_handle_t* handle, void* arg);

#define CLI_RELEASE_UV(loop)        \
  do {                              \
    uv_walk(loop, cliWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);   \
    uv_loop_close(loop);            \
  } while (0);

dengyihao's avatar
dengyihao 已提交
233
// snprintf may cause performance problem
dengyihao's avatar
dengyihao 已提交
234 235 236 237 238 239 240
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
  do {                                         \
    char*   p = key;                           \
    int32_t len = strlen(ip);                  \
    if (p != NULL) memcpy(p, ip, len);         \
    p[len] = ':';                              \
    titoa(port, 10, &p[len + 1]);              \
dengyihao's avatar
dengyihao 已提交
241 242
  } while (0)

dengyihao's avatar
dengyihao 已提交
243 244
#define CONN_PERSIST_TIME(para)   ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
dengyihao's avatar
dengyihao 已提交
245

dengyihao's avatar
dengyihao 已提交
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle)                         \
  do {                                                                    \
    int i = 0, sz = transQueueSize(&conn->cliMsgs);                       \
    for (; i < sz; i++) {                                                 \
      pMsg = transQueueGet(&conn->cliMsgs, i);                            \
      if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
        break;                                                            \
      }                                                                   \
    }                                                                     \
    if (i == sz) {                                                        \
      pMsg = NULL;                                                        \
      tDebug("msg not found, %" PRIu64 "", ahandle);                      \
    } else {                                                              \
      pMsg = transQueueRm(&conn->cliMsgs, i);                             \
      tDebug("msg found, %" PRIu64 "", ahandle);                          \
    }                                                                     \
dengyihao's avatar
dengyihao 已提交
262
  } while (0)
dengyihao's avatar
dengyihao 已提交
263

U
ubuntu 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276
#define CONN_GET_NEXT_SENDMSG(conn)                 \
  do {                                              \
    int i = 0;                                      \
    do {                                            \
      pCliMsg = transQueueGet(&conn->cliMsgs, i++); \
      if (pCliMsg && 0 == pCliMsg->sent) {          \
        break;                                      \
      }                                             \
    } while (pCliMsg != NULL);                      \
    if (pCliMsg == NULL) {                          \
      goto _RETURN;                                 \
    }                                               \
  } while (0)
dengyihao's avatar
dengyihao 已提交
277

dengyihao's avatar
formate  
dengyihao 已提交
278 279
#define CONN_SET_PERSIST_BY_APP(conn) \
  do {                                \
dengyihao's avatar
dengyihao 已提交
280 281
    if (conn->status == ConnNormal) { \
      conn->status = ConnAcquire;     \
dengyihao's avatar
formate  
dengyihao 已提交
282 283 284
      transRefCliHandle(conn);        \
    }                                 \
  } while (0)
285

dengyihao's avatar
dengyihao 已提交
286 287 288 289
#define CONN_NO_PERSIST_BY_APP(conn) \
  (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) \
  (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
290

S
Shengliang Guan 已提交
291 292
#define REQUEST_NO_RESP(msg)         ((msg)->info.noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg)   ((msg)->info.persistHandle == 1)
dengyihao's avatar
dengyihao 已提交
293
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
dengyihao's avatar
dengyihao 已提交
294

dengyihao's avatar
dengyihao 已提交
295
#define EPSET_IS_VALID(epSet)       ((epSet) != NULL && (epSet)->numOfEps >= 0 && (epSet)->inUse >= 0)
dengyihao's avatar
dengyihao 已提交
296
#define EPSET_GET_SIZE(epSet)       (epSet)->numOfEps
dengyihao's avatar
dengyihao 已提交
297
#define EPSET_GET_INUSE_IP(epSet)   ((epSet)->eps[(epSet)->inUse].fqdn)
dengyihao's avatar
dengyihao 已提交
298
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
dengyihao's avatar
dengyihao 已提交
299 300 301 302 303 304
#define EPSET_FORWARD_INUSE(epSet)                             \
  do {                                                         \
    if ((epSet)->numOfEps != 0) {                              \
      ++((epSet)->inUse);                                      \
      (epSet)->inUse = ((epSet)->inUse) % ((epSet)->numOfEps); \
    }                                                          \
dengyihao's avatar
dengyihao 已提交
305
  } while (0)
dengyihao's avatar
dengyihao 已提交
306

dengyihao's avatar
dengyihao 已提交
307 308 309 310 311 312 313 314 315 316 317
#define EPSET_DEBUG_STR(epSet, tbuf)                                                                                   \
  do {                                                                                                                 \
    int len = snprintf(tbuf, sizeof(tbuf), "epset:{");                                                                 \
    for (int i = 0; i < (epSet)->numOfEps; i++) {                                                                      \
      if (i == (epSet)->numOfEps - 1) {                                                                                \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port);   \
      } else {                                                                                                         \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
      }                                                                                                                \
    }                                                                                                                  \
    len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse);                                    \
dengyihao's avatar
dengyihao 已提交
318
  } while (0);
dengyihao's avatar
dengyihao 已提交
319

U
ubuntu 已提交
320
static void* cliWorkThread(void* arg);
dengyihao's avatar
dengyihao 已提交
321

dengyihao's avatar
dengyihao 已提交
322 323 324 325 326 327 328 329
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
    if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
      if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
        conn->ctx.freeFunc(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
330
      } else if (msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) {
dengyihao's avatar
dengyihao 已提交
331
        tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
332
        pThrd->destroyAhandleFp(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
333 334 335 336
      }
    }
    destroyCmsg(msg);
  }
dengyihao's avatar
dengyihao 已提交
337
  transQueueClear(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
338
  memset(&conn->ctx, 0, sizeof(conn->ctx));
dengyihao's avatar
dengyihao 已提交
339
}
dengyihao's avatar
dengyihao 已提交
340
bool cliMaySendCachedMsg(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
341
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
342
    SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
343
    CONN_GET_NEXT_SENDMSG(conn);
dengyihao's avatar
dengyihao 已提交
344 345
    cliSend(conn);
    return true;
dengyihao's avatar
dengyihao 已提交
346
  }
dengyihao's avatar
dengyihao 已提交
347
  return false;
U
ubuntu 已提交
348 349
_RETURN:
  return false;
dengyihao's avatar
dengyihao 已提交
350
}
dengyihao's avatar
formate  
dengyihao 已提交
351
void cliHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
352 353
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
354

dengyihao's avatar
dengyihao 已提交
355 356 357 358 359 360
  if (conn->timer) {
    if (uv_is_active((uv_handle_t*)conn->timer)) {
      tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
      uv_timer_stop(conn->timer);
    }
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
361
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
362
    conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
363 364
  }

dengyihao's avatar
opt rpc  
dengyihao 已提交
365
  STransMsgHead* pHead = NULL;
dengyihao's avatar
dengyihao 已提交
366 367 368

  int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
  if (msgLen <= 0) {
dengyihao's avatar
dengyihao 已提交
369 370 371
    tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
372 373 374 375

  if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
    tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
  }
dengyihao's avatar
dengyihao 已提交
376 377
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
378 379 380 381
  if (cliRecvReleaseReq(conn, pHead)) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
382 383 384 385 386
  STransMsg transMsg = {0};
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = transContFromHead((char*)pHead);
  transMsg.code = pHead->code;
  transMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
387
  transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
388
  transMsg.info.traceId = pHead->traceId;
dengyihao's avatar
dengyihao 已提交
389
  transMsg.info.hasEpSet = pHead->hasEpSet;
dengyihao's avatar
dengyihao 已提交
390

dengyihao's avatar
dengyihao 已提交
391 392 393 394
  SCliMsg*       pMsg = NULL;
  STransConnCtx* pCtx = NULL;
  if (CONN_NO_PERSIST_BY_APP(conn)) {
    pMsg = transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
395 396 397

    pCtx = pMsg ? pMsg->ctx : NULL;
    transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
398
    tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
U
ubuntu 已提交
399
  } else {
dengyihao's avatar
dengyihao 已提交
400 401 402
    uint64_t ahandle = (uint64_t)pHead->ahandle;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
    if (pMsg == NULL) {
S
Shengliang Guan 已提交
403
      transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
404 405
      tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
             transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
406
      if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
D
dapan1121 已提交
407
        transMsg.code = TSDB_CODE_RPC_BROKEN_LINK;
S
Shengliang Guan 已提交
408
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
dengyihao's avatar
dengyihao 已提交
409 410
        tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
411 412
      }
    } else {
dengyihao's avatar
dengyihao 已提交
413
      pCtx = pMsg->ctx;
S
Shengliang Guan 已提交
414
      transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
415
      tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
416
    }
U
ubuntu 已提交
417
  }
dengyihao's avatar
dengyihao 已提交
418
  // buf's mem alread translated to transMsg.pCont
dengyihao's avatar
dengyihao 已提交
419
  if (!CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
420
    transMsg.info.handle = (void*)conn->refId;
dengyihao's avatar
dengyihao 已提交
421
    tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
422
  }
dengyihao's avatar
dengyihao 已提交
423

dengyihao's avatar
dengyihao 已提交
424
  STraceId* trace = &transMsg.info.traceId;
dengyihao's avatar
dengyihao 已提交
425
  tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
dengyihao's avatar
dengyihao 已提交
426
          TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
dengyihao's avatar
dengyihao 已提交
427

dengyihao's avatar
dengyihao 已提交
428
  if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
429
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
430
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
431 432
    return;
  }
S
Shengliang Guan 已提交
433
  if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
434
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
435
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
436 437
    return;
  }
dengyihao's avatar
dengyihao 已提交
438

dengyihao's avatar
dengyihao 已提交
439
  if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
440 441 442
    if (cliAppCb(conn, &transMsg, pMsg) != 0) {
      return;
    }
dengyihao's avatar
dengyihao 已提交
443
  }
dengyihao's avatar
dengyihao 已提交
444 445
  destroyCmsg(pMsg);

dengyihao's avatar
dengyihao 已提交
446
  if (cliMaySendCachedMsg(conn) == true) {
dengyihao's avatar
dengyihao 已提交
447 448
    return;
  }
dengyihao's avatar
dengyihao 已提交
449

U
ubuntu 已提交
450
  if (CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
451
    return addConnToPool(pThrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
452
  }
dengyihao's avatar
test  
dengyihao 已提交
453

dengyihao's avatar
dengyihao 已提交
454
  uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
455
}
U
ubuntu 已提交
456

dengyihao's avatar
dengyihao 已提交
457
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
dengyihao's avatar
dengyihao 已提交
458
  if (transQueueEmpty(&pConn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
459
    if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
dengyihao's avatar
dengyihao 已提交
460
      tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
formate  
dengyihao 已提交
461 462 463
      transUnrefCliHandle(pConn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
464
  }
dengyihao's avatar
dengyihao 已提交
465 466 467
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
  bool      once = false;
D
dapan1121 已提交
468
  do {
dengyihao's avatar
dengyihao 已提交
469
    SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
dengyihao's avatar
enh log  
dengyihao 已提交
470

D
dapan1121 已提交
471 472
    if (pMsg == NULL && once) {
      break;
dengyihao's avatar
dengyihao 已提交
473
    }
dengyihao's avatar
enh log  
dengyihao 已提交
474 475 476 477 478 479

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg)) {
      destroyCmsg(pMsg);
      break;
    }

dengyihao's avatar
dengyihao 已提交
480
    STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
U
ubuntu 已提交
481

dengyihao's avatar
dengyihao 已提交
482
    STransMsg transMsg = {0};
dengyihao's avatar
dengyihao 已提交
483
    transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
dengyihao's avatar
dengyihao 已提交
484
    transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
S
Shengliang Guan 已提交
485
    transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
486

dengyihao's avatar
dengyihao 已提交
487
    if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
S
Shengliang Guan 已提交
488
      transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
489
      tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
dengyihao's avatar
dengyihao 已提交
490
             TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
491
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
492 493 494
        int32_t msgType = 0;
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType);
        transMsg.msgType = msgType;
dengyihao's avatar
dengyihao 已提交
495
        tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
S
Shengliang Guan 已提交
496
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
497
      }
dengyihao's avatar
dengyihao 已提交
498
    } else {
dengyihao's avatar
dengyihao 已提交
499
      transMsg.info.ahandle = (pMsg != NULL && pMsg->type != Release && pCtx) ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
500 501 502
    }

    if (pCtx == NULL || pCtx->pSem == NULL) {
S
Shengliang Guan 已提交
503
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
504 505 506 507 508
        if (pMsg == NULL || REQUEST_NO_RESP(&pMsg->msg) || pMsg->type == Release) {
          destroyCmsg(pMsg);
          once = true;
          continue;
        }
U
ubuntu 已提交
509
      }
dengyihao's avatar
dengyihao 已提交
510
    }
dengyihao's avatar
enh log  
dengyihao 已提交
511

dengyihao's avatar
dengyihao 已提交
512
    if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
513 514 515
      if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
        return;
      }
dengyihao's avatar
dengyihao 已提交
516 517
    }
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
518
    tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn));
D
dapan1121 已提交
519
  } while (!transQueueEmpty(&pConn->cliMsgs));
dengyihao's avatar
dengyihao 已提交
520
  transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
521
}
dengyihao's avatar
dengyihao 已提交
522
void cliHandleExcept(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
523
  tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
524 525
  cliHandleExceptImpl(conn, -1);
}
dengyihao's avatar
dengyihao 已提交
526

dengyihao's avatar
dengyihao 已提交
527 528
void cliConnTimeout(uv_timer_t* handle) {
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
529 530
  SCliThrd* pThrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
531
  tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
532

dengyihao's avatar
dengyihao 已提交
533
  uv_timer_stop(handle);
dengyihao's avatar
dengyihao 已提交
534 535 536
  handle->data = NULL;
  taosArrayPush(pThrd->timerList, &conn->timer);
  conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
537 538

  cliHandleFastFail(conn, UV_ECANCELED);
dengyihao's avatar
dengyihao 已提交
539
}
dengyihao's avatar
dengyihao 已提交
540 541 542 543
void cliReadTimeoutCb(uv_timer_t* handle) {
  // set up timeout cb
  SCliConn* conn = handle->data;
  tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
544
  uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
545 546
  cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
}
U
ubuntu 已提交
547 548

void* createConnPool(int size) {
549 550
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
551
}
dengyihao's avatar
dengyihao 已提交
552 553
void* destroyConnPool(SCliThrd* pThrd) {
  void*      pool = pThrd->pool;
dengyihao's avatar
dengyihao 已提交
554
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
555
  while (connList != NULL) {
dengyihao's avatar
dengyihao 已提交
556 557
    while (!QUEUE_IS_EMPTY(&connList->conns)) {
      queue*    h = QUEUE_HEAD(&connList->conns);
dengyihao's avatar
dengyihao 已提交
558
      SCliConn* c = QUEUE_DATA(h, SCliConn, q);
U
ubuntu 已提交
559
      cliDestroyConn(c, true);
dengyihao's avatar
dengyihao 已提交
560
    }
dengyihao's avatar
dengyihao 已提交
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575

    SMsgList* msglist = connList->list;
    while (!QUEUE_IS_EMPTY(&msglist->msgQ)) {
      queue* h = QUEUE_HEAD(&msglist->msgQ);
      QUEUE_REMOVE(h);

      SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);

      transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
      pMsg->ctx->task = NULL;

      doNotifyApp(pMsg, pThrd);
    }
    taosMemoryFree(msglist);

dengyihao's avatar
dengyihao 已提交
576
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
577
  }
dengyihao's avatar
dengyihao 已提交
578
  taosHashCleanup(pool);
579
  return NULL;
dengyihao's avatar
dengyihao 已提交
580 581
}

dengyihao's avatar
dengyihao 已提交
582
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
dengyihao's avatar
dengyihao 已提交
583
  void*      pool = pThrd->pool;
dengyihao's avatar
dengyihao 已提交
584
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
585
  STrans*    pTranInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
586
  if (plist == NULL) {
dengyihao's avatar
dengyihao 已提交
587 588 589 590
    SConnList list = {0};
    taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pool, key, strlen(key));

dengyihao's avatar
dengyihao 已提交
591 592 593 594
    SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
    QUEUE_INIT(&nList->msgQ);
    nList->numOfConn++;

dengyihao's avatar
dengyihao 已提交
595 596
    QUEUE_INIT(&plist->conns);
    plist->list = nList;
dengyihao's avatar
dengyihao 已提交
597 598
  }

dengyihao's avatar
dengyihao 已提交
599 600 601 602
  if (QUEUE_IS_EMPTY(&plist->conns)) {
    if (plist->list->numOfConn >= pTranInst->connLimitNum) {
      *exceed = true;
    }
dengyihao's avatar
dengyihao 已提交
603 604 605
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
606 607
  queue* h = QUEUE_HEAD(&plist->conns);
  QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
608
  plist->size -= 1;
dengyihao's avatar
dengyihao 已提交
609

dengyihao's avatar
dengyihao 已提交
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
  conn->status = ConnNormal;
  QUEUE_INIT(&conn->q);

  if (conn->task != NULL) {
    transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
    conn->task = NULL;
  }
  return conn;
}

static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
  void*      pool = pThrd->pool;
  STrans*    pTransInst = pThrd->pTransInst;
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
  if (plist == NULL) {
dengyihao's avatar
dengyihao 已提交
626 627 628 629
    SConnList list = {0};
    taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pool, key, strlen(key));

dengyihao's avatar
dengyihao 已提交
630 631
    SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
    QUEUE_INIT(&nList->msgQ);
dengyihao's avatar
dengyihao 已提交
632 633
    nList->numOfConn++;

dengyihao's avatar
dengyihao 已提交
634 635
    QUEUE_INIT(&plist->conns);
    plist->list = nList;
dengyihao's avatar
dengyihao 已提交
636 637 638
  }

  // no avaliable conn in pool
dengyihao's avatar
dengyihao 已提交
639
  if (QUEUE_IS_EMPTY(&plist->conns)) {
dengyihao's avatar
dengyihao 已提交
640
    SMsgList* list = plist->list;
dengyihao's avatar
dengyihao 已提交
641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670
    if ((list)->numOfConn >= pTransInst->connLimitNum) {
      STraceId* trace = &(*pMsg)->msg.info.traceId;

      STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
      arg->param1 = *pMsg;
      arg->param2 = pThrd;
      (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);

      tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));

      QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
      *pMsg = NULL;
    } else {
      // send msg in delay queue
      if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
        STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
        arg->param1 = *pMsg;
        arg->param2 = pThrd;
        (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);

        QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
        queue* h = QUEUE_HEAD(&(list)->msgQ);
        QUEUE_REMOVE(h);
        SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q);

        *pMsg = ans;
        transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
      }
      list->numOfConn++;
    }
dengyihao's avatar
dengyihao 已提交
671 672
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
673

dengyihao's avatar
dengyihao 已提交
674
  queue* h = QUEUE_HEAD(&plist->conns);
dengyihao's avatar
dengyihao 已提交
675
  plist->size -= 1;
dengyihao's avatar
dengyihao 已提交
676 677
  QUEUE_REMOVE(h);

dengyihao's avatar
dengyihao 已提交
678
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
dengyihao's avatar
dengyihao 已提交
679
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
680
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
681

dengyihao's avatar
dengyihao 已提交
682 683 684 685
  if (conn->task != NULL) {
    transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
    conn->task = NULL;
  }
dengyihao's avatar
dengyihao 已提交
686
  return conn;
dengyihao's avatar
dengyihao 已提交
687
}
dengyihao's avatar
dengyihao 已提交
688 689 690 691
static void addConnToPool(void* pool, SCliConn* conn) {
  if (conn->status == ConnInPool) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
692
  tError("add conn to pool");
dengyihao's avatar
dengyihao 已提交
693 694
  allocConnRef(conn, true);

dengyihao's avatar
dengyihao 已提交
695
  SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
696 697 698 699 700 701
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    taosArrayPush(thrd->timerList, &conn->timer);
    conn->timer->data = NULL;
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
702 703 704 705 706
  if (T_REF_VAL_GET(conn) > 1) {
    transUnrefCliHandle(conn);
  }

  cliDestroyConnMsgs(conn, false);
dengyihao's avatar
dengyihao 已提交
707

dengyihao's avatar
dengyihao 已提交
708
  if (conn->list == NULL) {
dengyihao's avatar
dengyihao 已提交
709
    conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
dengyihao's avatar
dengyihao 已提交
710
  }
dengyihao's avatar
dengyihao 已提交
711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731

  SConnList* pList = conn->list;
  SMsgList*  msgList = pList->list;
  if (!QUEUE_IS_EMPTY(&msgList->msgQ)) {
    queue* h = QUEUE_HEAD(&(msgList)->msgQ);
    QUEUE_REMOVE(h);

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);

    transDQCancel(thrd->waitConnQueue, pMsg->ctx->task);
    pMsg->ctx->task = NULL;

    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
    transQueuePush(&conn->cliMsgs, pMsg);

    conn->status = ConnNormal;
    cliSend(conn);
    return;
  }

  conn->status = ConnInPool;
dengyihao's avatar
dengyihao 已提交
732
  QUEUE_PUSH(&conn->list->conns, &conn->q);
dengyihao's avatar
dengyihao 已提交
733
  conn->list->size += 1;
dengyihao's avatar
dengyihao 已提交
734

dengyihao's avatar
dengyihao 已提交
735
  if (conn->list->size >= 250) {
dengyihao's avatar
dengyihao 已提交
736 737
    STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
    arg->param1 = conn;
dengyihao's avatar
dengyihao 已提交
738
    arg->param2 = thrd;
dengyihao's avatar
dengyihao 已提交
739 740

    STrans* pTransInst = thrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
741 742
    conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
  }
dengyihao's avatar
dengyihao 已提交
743
}
dengyihao's avatar
dengyihao 已提交
744
static int32_t allocConnRef(SCliConn* conn, bool update) {
dengyihao's avatar
dengyihao 已提交
745
  if (update) {
dengyihao's avatar
dengyihao 已提交
746
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
747
    transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
748
    conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
749 750 751 752
  }
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
753
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
dengyihao's avatar
dengyihao 已提交
754
  conn->refId = exh->refId;
755 756 757 758

  if (conn->refId == -1) {
    taosMemoryFree(exh);
  }
dengyihao's avatar
dengyihao 已提交
759 760 761 762 763
  return 0;
}

static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
  if (update) {
dengyihao's avatar
dengyihao 已提交
764
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
765 766 767 768 769 770 771 772 773 774 775 776 777
    transRemoveExHandle(transGetRefMgt(), conn->refId);
    conn->refId = -1;
  }
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
  if (exh == NULL) {
    return -1;
  }
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
  conn->refId = exh->refId;

  transReleaseExHandle(transGetRefMgt(), handle);
  return 0;
dengyihao's avatar
dengyihao 已提交
778
}
dengyihao's avatar
dengyihao 已提交
779

dengyihao's avatar
dengyihao 已提交
780
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
781
  SCliConn*    conn = handle->data;
dengyihao's avatar
dengyihao 已提交
782
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
783
  tTrace("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
784
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
785
}
U
ubuntu 已提交
786
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
787
  // impl later
dengyihao's avatar
dengyihao 已提交
788 789 790
  if (handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
791 792
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
793
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
794
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
795
    while (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
796
      tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
797 798 799 800 801 802
      if (pBuf->invalid) {
        cliHandleExcept(conn);
        break;
      } else {
        cliHandleResp(conn);
      }
dengyihao's avatar
dengyihao 已提交
803
    }
dengyihao's avatar
dengyihao 已提交
804 805
    return;
  }
dengyihao's avatar
dengyihao 已提交
806

807
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
808 809 810
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
dengyihao's avatar
dengyihao 已提交
811
    tTrace("%s conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
812 813
    return;
  }
dengyihao's avatar
fix bug  
dengyihao 已提交
814
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
815
    tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
816
    conn->broken = true;
U
ubuntu 已提交
817
    cliHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
818
  }
dengyihao's avatar
dengyihao 已提交
819
}
dengyihao's avatar
dengyihao 已提交
820

dengyihao's avatar
dengyihao 已提交
821
static SCliConn* cliCreateConn(SCliThrd* pThrd) {
wafwerar's avatar
wafwerar 已提交
822
  SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
823
  // read/write stream handle
G
gccgdb1234 已提交
824
  conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
825 826 827
  uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
  conn->stream->data = conn;

dengyihao's avatar
dengyihao 已提交
828 829 830 831 832 833 834 835
  uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
  if (timer == NULL) {
    timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    tDebug("no available timer, create a timer %p", timer);
    uv_timer_init(pThrd->loop, timer);
  }
  timer->data = conn;
  conn->timer = timer;
dengyihao's avatar
dengyihao 已提交
836

dengyihao's avatar
dengyihao 已提交
837
  conn->connReq.data = conn;
dengyihao's avatar
dengyihao 已提交
838 839
  transReqQueueInit(&conn->wreqQueue);

dengyihao's avatar
dengyihao 已提交
840
  transQueueInit(&conn->cliMsgs, NULL);
dengyihao's avatar
opt rpc  
dengyihao 已提交
841 842

  transInitBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
843
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
844
  conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
845
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
846
  conn->broken = false;
dengyihao's avatar
dengyihao 已提交
847
  transRefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
848

dengyihao's avatar
dengyihao 已提交
849
  atomic_add_fetch_32(&pThrd->connCount, 1);
dengyihao's avatar
dengyihao 已提交
850
  allocConnRef(conn, false);
dengyihao's avatar
dengyihao 已提交
851

dengyihao's avatar
dengyihao 已提交
852 853
  return conn;
}
U
ubuntu 已提交
854
static void cliDestroyConn(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
855
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
856
  tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
857

dengyihao's avatar
dengyihao 已提交
858 859
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
860 861 862 863 864 865 866 867 868 869

  if (conn->list != NULL) {
    SConnList* connList = conn->list;
    connList->list->numOfConn--;
  } else {
    SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
    connList->list->numOfConn--;
  }
  conn->list = NULL;

dengyihao's avatar
dengyihao 已提交
870
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
871
  transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
opt rpc  
dengyihao 已提交
872
  conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
873

dengyihao's avatar
dengyihao 已提交
874 875 876 877 878 879 880
  if (conn->task != NULL) {
    transDQCancel(pThrd->timeoutQueue, conn->task);
    conn->task = NULL;
  }
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
881
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
882 883
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
884

dengyihao's avatar
dengyihao 已提交
885
  if (clear) {
dengyihao's avatar
dengyihao 已提交
886
    if (!uv_is_closing((uv_handle_t*)conn->stream)) {
dengyihao's avatar
dengyihao 已提交
887
      uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
888 889
      uv_close((uv_handle_t*)conn->stream, cliDestroy);
    }
890
  }
dengyihao's avatar
dengyihao 已提交
891
}
U
ubuntu 已提交
892
static void cliDestroy(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
893 894 895
  if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
896
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
897
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
898 899
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
dengyihao's avatar
dengyihao 已提交
900 901
    taosArrayPush(pThrd->timerList, &conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
902 903
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
904

dengyihao's avatar
dengyihao 已提交
905 906
  atomic_sub_fetch_32(&pThrd->connCount, 1);

dengyihao's avatar
dengyihao 已提交
907
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
908
  transRemoveExHandle(transGetRefMgt(), conn->refId);
wafwerar's avatar
wafwerar 已提交
909 910
  taosMemoryFree(conn->ip);
  taosMemoryFree(conn->stream);
dengyihao's avatar
dengyihao 已提交
911 912 913

  cliDestroyConnMsgs(conn, true);

dengyihao's avatar
dengyihao 已提交
914
  tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
915
  transReqQueueClear(&conn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
916
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
917

wafwerar's avatar
wafwerar 已提交
918
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
919
}
dengyihao's avatar
dengyihao 已提交
920
static bool cliHandleNoResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
921 922
  bool res = false;
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
923
    SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0);
dengyihao's avatar
dengyihao 已提交
924
    if (REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
925
      transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
926 927 928 929 930
      destroyCmsg(pMsg);
      res = true;
    }
    if (res == true) {
      if (cliMaySendCachedMsg(conn) == false) {
dengyihao's avatar
dengyihao 已提交
931
        SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
932
        addConnToPool(thrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
933 934 935
        res = false;
      } else {
        res = true;
dengyihao's avatar
dengyihao 已提交
936 937 938 939 940
      }
    }
  }
  return res;
}
U
ubuntu 已提交
941
static void cliSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
942 943
  SCliConn* pConn = transReqQueueRemove(req);
  if (pConn == NULL) return;
dengyihao's avatar
dengyihao 已提交
944

dengyihao's avatar
dengyihao 已提交
945 946 947
  SCliMsg* pMsg = !transQueueEmpty(&pConn->cliMsgs) ? transQueueGet(&pConn->cliMsgs, 0) : NULL;
  if (pMsg != NULL) {
    int64_t cost = taosGetTimestampUs() - pMsg->st;
dengyihao's avatar
dengyihao 已提交
948
    if (cost > 1000 * 20) {
dengyihao's avatar
dengyihao 已提交
949 950 951 952
      tWarn("%s conn %p send cost:%dus, send exception", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
    }
  }

dengyihao's avatar
dengyihao 已提交
953
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
954
    tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
955
  } else {
dengyihao's avatar
dengyihao 已提交
956 957 958 959
    if (!uv_is_closing((uv_handle_t*)&pConn->stream)) {
      tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
      cliHandleExcept(pConn);
    }
dengyihao's avatar
dengyihao 已提交
960 961
    return;
  }
dengyihao's avatar
dengyihao 已提交
962
  if (cliHandleNoResp(pConn) == true) {
dengyihao's avatar
dengyihao 已提交
963
    tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
964 965
    return;
  }
dengyihao's avatar
dengyihao 已提交
966
  uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
967
}
dengyihao's avatar
dengyihao 已提交
968 969 970 971
void cliSendBatch(SCliConn* pConn) {
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

dengyihao's avatar
dengyihao 已提交
972 973 974 975 976
  SCliBatch*     pBatch = pConn->pBatch;
  SCliBatchList* pList = pBatch->pList;
  pList->connCnt += 1;

  int32_t wLen = pBatch->wLen;
dengyihao's avatar
dengyihao 已提交
977 978 979 980

  uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
  int       i = 0;

dengyihao's avatar
dengyihao 已提交
981 982
  queue* h = NULL;
  QUEUE_FOREACH(h, &pBatch->wq) {
dengyihao's avatar
dengyihao 已提交
983 984 985 986 987 988 989 990 991
    SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q);

    STransConnCtx* pCtx = pCliMsg->ctx;

    STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
    if (pMsg->pCont == 0) {
      pMsg->pCont = (void*)rpcMallocCont(0);
      pMsg->contLen = 0;
    }
dengyihao's avatar
dengyihao 已提交
992

dengyihao's avatar
dengyihao 已提交
993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008
    int            msgLen = transMsgLenFromCont(pMsg->contLen);
    STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);

    if (pHead->comp == 0) {
      pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
      pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
      pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
      pHead->msgType = pMsg->msgType;
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
      memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
      pHead->traceId = pMsg->info.traceId;
      pHead->magicNum = htonl(TRANS_MAGIC_NUM);
    }
    pHead->timestamp = taosHton64(taosGetTimestampUs());

dengyihao's avatar
dengyihao 已提交
1009 1010 1011 1012 1013 1014 1015 1016
    if (pHead->comp == 0) {
      if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
        msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
        pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      }
    } else {
      msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
    }
dengyihao's avatar
dengyihao 已提交
1017 1018 1019 1020 1021
    wb[i++] = uv_buf_init((char*)pHead, msgLen);
  }

  uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
  req->data = pConn;
dengyihao's avatar
dengyihao 已提交
1022
  tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn,
dengyihao's avatar
dengyihao 已提交
1023
         pBatch->wLen, pBatch->batchSize);
dengyihao's avatar
dengyihao 已提交
1024 1025 1026
  uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
  taosMemoryFree(wb);
}
U
ubuntu 已提交
1027
void cliSend(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
1028 1029 1030 1031 1032
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  if (transQueueEmpty(&pConn->cliMsgs)) {
    tError("%s conn %p not msg to send", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
1033
    cliHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
1034 1035
    return;
  }
dengyihao's avatar
dengyihao 已提交
1036 1037

  SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
1038
  CONN_GET_NEXT_SENDMSG(pConn);
dengyihao's avatar
dengyihao 已提交
1039 1040
  pCliMsg->sent = 1;

dengyihao's avatar
dengyihao 已提交
1041
  STransConnCtx* pCtx = pCliMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1042

U
ubuntu 已提交
1043
  STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
dengyihao's avatar
dengyihao 已提交
1044 1045 1046 1047
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
dengyihao's avatar
dengyihao 已提交
1048

dengyihao's avatar
dengyihao 已提交
1049
  int            msgLen = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
1050
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
1051

dengyihao's avatar
dengyihao 已提交
1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062
  if (pHead->comp == 0) {
    pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
    pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
    pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
    pHead->msgType = pMsg->msgType;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
    memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
    pHead->traceId = pMsg->info.traceId;
    pHead->magicNum = htonl(TRANS_MAGIC_NUM);
  }
dengyihao's avatar
dengyihao 已提交
1063
  pHead->timestamp = taosHton64(taosGetTimestampUs());
dengyihao's avatar
dengyihao 已提交
1064

dengyihao's avatar
dengyihao 已提交
1065 1066 1067
  if (pHead->persist == 1) {
    CONN_SET_PERSIST_BY_APP(pConn);
  }
dengyihao's avatar
dengyihao 已提交
1068

dengyihao's avatar
dengyihao 已提交
1069
  STraceId* trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
1070

dengyihao's avatar
dengyihao 已提交
1071
  if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
dengyihao's avatar
dengyihao 已提交
1072
    uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
dengyihao's avatar
dengyihao 已提交
1073 1074
    if (timer == NULL) {
      timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
dengyihao's avatar
dengyihao 已提交
1075
      tDebug("no available timer, create a timer %p", timer);
dengyihao's avatar
dengyihao 已提交
1076 1077 1078 1079 1080
      uv_timer_init(pThrd->loop, timer);
    }
    timer->data = pConn;
    pConn->timer = timer;

dengyihao's avatar
dengyihao 已提交
1081 1082 1083
    tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
    uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
  }
dengyihao's avatar
dengyihao 已提交
1084

dengyihao's avatar
dengyihao 已提交
1085 1086 1087 1088 1089 1090 1091
  if (pHead->comp == 0) {
    if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
      msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    }
  } else {
    msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
dengyihao's avatar
dengyihao 已提交
1092
  }
dengyihao's avatar
dengyihao 已提交
1093

dengyihao's avatar
dengyihao 已提交
1094 1095
  tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
          TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
dengyihao's avatar
dengyihao 已提交
1096

dengyihao's avatar
dengyihao 已提交
1097
  uv_buf_t    wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
1098
  uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
1099 1100 1101

  int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1102
    tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
dengyihao's avatar
dengyihao 已提交
1103 1104 1105
            uv_err_name(status));
    cliHandleExcept(pConn);
  }
U
ubuntu 已提交
1106 1107
  return;
_RETURN:
dengyihao's avatar
dengyihao 已提交
1108
  return;
dengyihao's avatar
dengyihao 已提交
1109
}
dengyihao's avatar
dengyihao 已提交
1110 1111

static void cliDestroyBatch(SCliBatch* pBatch) {
dengyihao's avatar
dengyihao 已提交
1112
  if (pBatch == NULL) return;
dengyihao's avatar
dengyihao 已提交
1113
  while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
1114 1115
    queue* h = QUEUE_HEAD(&pBatch->wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
1116

dengyihao's avatar
dengyihao 已提交
1117
    SCliMsg* p = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1118 1119
    destroyCmsg(p);
  }
dengyihao's avatar
dengyihao 已提交
1120 1121
  SCliBatchList* p = pBatch->pList;
  p->sending -= 1;
dengyihao's avatar
dengyihao 已提交
1122 1123
  taosMemoryFree(pBatch);
}
dengyihao's avatar
dengyihao 已提交
1124
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1125 1126 1127 1128 1129
  if (pThrd->quit == true) {
    cliDestroyBatch(pBatch);
    return;
  }

dengyihao's avatar
dengyihao 已提交
1130
  if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
1131 1132
    return;
  }
dengyihao's avatar
dengyihao 已提交
1133 1134
  STrans*        pTransInst = pThrd->pTransInst;
  SCliBatchList* pList = pBatch->pList;
dengyihao's avatar
dengyihao 已提交
1135

dengyihao's avatar
dengyihao 已提交
1136 1137 1138
  char key[TSDB_FQDN_LEN + 64] = {0};
  CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port);

dengyihao's avatar
dengyihao 已提交
1139 1140
  bool      exceed = false;
  SCliConn* conn = getConnFromPool(pThrd, key, &exceed);
dengyihao's avatar
dengyihao 已提交
1141

dengyihao's avatar
dengyihao 已提交
1142 1143 1144
  if (conn == NULL && exceed) {
    tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen,
           pBatch->batchSize, pTransInst->connLimitNum);
dengyihao's avatar
dengyihao 已提交
1145
    cliDestroyBatch(pBatch);
dengyihao's avatar
dengyihao 已提交
1146 1147
    return;
  }
dengyihao's avatar
dengyihao 已提交
1148 1149
  if (conn == NULL) {
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
1150
    conn->pBatch = pBatch;
1151
    conn->ip = taosStrdup(pList->dst);
dengyihao's avatar
dengyihao 已提交
1152

dengyihao's avatar
dengyihao 已提交
1153
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip);
dengyihao's avatar
dengyihao 已提交
1154 1155 1156 1157 1158 1159
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1160
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1161 1162 1163 1164 1165
      return;
    }
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1166
    addr.sin_port = (uint16_t)htons(pList->port);
dengyihao's avatar
dengyihao 已提交
1167

dengyihao's avatar
dengyihao 已提交
1168
    tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst);
dengyihao's avatar
dengyihao 已提交
1169 1170
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
    if (fd == -1) {
dengyihao's avatar
dengyihao 已提交
1171 1172 1173
      tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
             tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1174 1175 1176 1177
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1178 1179
      tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1180 1181 1182 1183
      return;
    }
    ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1184 1185
      tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1186 1187 1188 1189 1190 1191 1192 1193 1194
      return;
    }

    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
    if (ret != 0) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
1195
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1196 1197 1198 1199 1200 1201
      return;
    }
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
    return;
  }

dengyihao's avatar
dengyihao 已提交
1202
  conn->pBatch = pBatch;
dengyihao's avatar
dengyihao 已提交
1203
  cliSendBatch(conn);
dengyihao's avatar
dengyihao 已提交
1204 1205
}
static void cliSendBatchCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
1206 1207 1208
  SCliConn*  conn = req->data;
  SCliThrd*  thrd = conn->hostThrd;
  SCliBatch* p = conn->pBatch;
dengyihao's avatar
dengyihao 已提交
1209

dengyihao's avatar
dengyihao 已提交
1210
  SCliBatchList* pBatchList = p->pList;
dengyihao's avatar
dengyihao 已提交
1211
  SCliBatch*     nxtBatch = cliGetHeadFromList(pBatchList);
dengyihao's avatar
dengyihao 已提交
1212 1213
  pBatchList->connCnt -= 1;

dengyihao's avatar
dengyihao 已提交
1214 1215 1216
  conn->pBatch = NULL;

  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1217
    tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
dengyihao's avatar
dengyihao 已提交
1218
           p->wLen, p->batchSize, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
1219 1220 1221

    if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn);

dengyihao's avatar
dengyihao 已提交
1222
    cliHandleBatchReq(nxtBatch, thrd);
dengyihao's avatar
dengyihao 已提交
1223
  } else {
dengyihao's avatar
dengyihao 已提交
1224
    tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
dengyihao's avatar
dengyihao 已提交
1225
           p->batchSize);
dengyihao's avatar
dengyihao 已提交
1226
    if (!uv_is_closing((uv_handle_t*)&conn->stream)) {
dengyihao's avatar
dengyihao 已提交
1227 1228 1229 1230 1231 1232
      if (nxtBatch != NULL) {
        conn->pBatch = nxtBatch;
        cliSendBatch(conn);
      } else {
        addConnToPool(thrd->pool, conn);
      }
dengyihao's avatar
dengyihao 已提交
1233
    } else {
dengyihao's avatar
dengyihao 已提交
1234 1235
      cliDestroyBatch(nxtBatch);
      // conn release by other callback
dengyihao's avatar
dengyihao 已提交
1236
    }
dengyihao's avatar
dengyihao 已提交
1237
  }
dengyihao's avatar
dengyihao 已提交
1238 1239 1240

  cliDestroyBatch(p);
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
1241
}
dengyihao's avatar
dengyihao 已提交
1242
static void cliHandleFastFail(SCliConn* pConn, int status) {
dengyihao's avatar
dengyihao 已提交
1243
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1244
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1245 1246 1247

  if (status == -1) status = ENETUNREACH;

dengyihao's avatar
dengyihao 已提交
1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266
  if (pConn->pBatch == NULL) {
    SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);

    STraceId* trace = &pMsg->msg.info.traceId;
    tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
            TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status));

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
        (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
      SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip));
      int64_t        cTimestamp = taosGetTimestampMs();
      if (item != NULL) {
        int32_t elapse = cTimestamp - item->timestamp;
        if (elapse >= 0 && elapse <= pTransInst->failFastInterval) {
          item->count++;
        } else {
          item->count = 1;
          item->timestamp = cTimestamp;
        }
dengyihao's avatar
dengyihao 已提交
1267
      } else {
dengyihao's avatar
dengyihao 已提交
1268 1269
        SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
        taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem));
dengyihao's avatar
dengyihao 已提交
1270 1271
      }
    }
dengyihao's avatar
dengyihao 已提交
1272
  } else {
dengyihao's avatar
dengyihao 已提交
1273 1274
    tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
           pConn, pConn->ip, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
1275 1276
    cliDestroyBatch(pConn->pBatch);
    pConn->pBatch = NULL;
dengyihao's avatar
dengyihao 已提交
1277 1278 1279
  }
  cliHandleExcept(pConn);
}
dengyihao's avatar
dengyihao 已提交
1280

dengyihao's avatar
dengyihao 已提交
1281 1282 1283
void cliConnCb(uv_connect_t* req, int status) {
  SCliConn* pConn = req->data;
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1284 1285 1286 1287 1288 1289 1290 1291 1292 1293
  bool      timeout = false;

  if (pConn->timer == NULL) {
    timeout = true;
  } else {
    uv_timer_stop(pConn->timer);
    pConn->timer->data = NULL;
    taosArrayPush(pThrd->timerList, &pConn->timer);
    pConn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
1294 1295

  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1296 1297 1298 1299 1300
    if (timeout == false) {
      cliHandleFastFail(pConn, status);
    } else if (timeout == true) {
      // already deal by timeout
    }
1301
    return;
dengyihao's avatar
dengyihao 已提交
1302
  }
dengyihao's avatar
dengyihao 已提交
1303

dengyihao's avatar
dengyihao 已提交
1304 1305
  struct sockaddr peername, sockname;
  int             addrlen = sizeof(peername);
dengyihao's avatar
dengyihao 已提交
1306
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
dengyihao's avatar
dengyihao 已提交
1307
  transSockInfo2Str(&peername, pConn->dst);
dengyihao's avatar
dengyihao 已提交
1308

dengyihao's avatar
dengyihao 已提交
1309 1310
  addrlen = sizeof(sockname);
  uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
dengyihao's avatar
dengyihao 已提交
1311
  transSockInfo2Str(&sockname, pConn->src);
dengyihao's avatar
dengyihao 已提交
1312

dengyihao's avatar
dengyihao 已提交
1313
  tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
1314 1315 1316 1317 1318
  if (pConn->pBatch != NULL) {
    cliSendBatch(pConn);
  } else {
    cliSend(pConn);
  }
dengyihao's avatar
dengyihao 已提交
1319 1320
}

dengyihao's avatar
dengyihao 已提交
1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
  STransConnCtx* pCtx = pMsg->ctx;
  STrans*        pTransInst = pThrd->pTransInst;

  STransMsg transMsg = {0};
  transMsg.contLen = 0;
  transMsg.pCont = NULL;
  transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS;
  transMsg.msgType = pMsg->msg.msgType + 1;
  transMsg.info.ahandle = pMsg->ctx->ahandle;
  transMsg.info.traceId = pMsg->msg.info.traceId;
  transMsg.info.hasEpSet = false;
  if (pCtx->pSem != NULL) {
    if (pCtx->pRsp == NULL) {
    } else {
      memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
    }
  } else {
    pTransInst->cfp(pTransInst->parent, &transMsg, NULL);
  }

  destroyCmsg(pMsg);
}
dengyihao's avatar
dengyihao 已提交
1344
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1345 1346 1347 1348 1349
  if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) {
    pThrd->stopMsg = pMsg;
    return;
  }
  pThrd->stopMsg = NULL;
dengyihao's avatar
dengyihao 已提交
1350
  pThrd->quit = true;
U
ubuntu 已提交
1351
  tDebug("cli work thread %p start to quit", pThrd);
dengyihao's avatar
dengyihao 已提交
1352
  destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1353

dengyihao's avatar
dengyihao 已提交
1354
  destroyConnPool(pThrd);
dengyihao's avatar
dengyihao 已提交
1355
  uv_walk(pThrd->loop, cliWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
1356
}
dengyihao's avatar
dengyihao 已提交
1357
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1358
  int64_t    refId = (int64_t)(pMsg->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1359
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1360
  if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1361
    tDebug("%" PRId64 " already released", refId);
dengyihao's avatar
dengyihao 已提交
1362 1363
    destroyCmsg(pMsg);
    return;
dengyihao's avatar
dengyihao 已提交
1364 1365 1366
  }

  SCliConn* conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1367
  transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1368
  tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
1369

dengyihao's avatar
dengyihao 已提交
1370 1371
  if (T_REF_VAL_GET(conn) == 2) {
    transUnrefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
1372 1373
    if (!transQueuePush(&conn->cliMsgs, pMsg)) {
      return;
dengyihao's avatar
dengyihao 已提交
1374 1375
    }
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1376 1377 1378
  } else {
    tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1379 1380
  }
}
dengyihao's avatar
dengyihao 已提交
1381
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1382
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1383
  pThrd->cvtAddr = pCtx->cvtAddr;
dengyihao's avatar
dengyihao 已提交
1384 1385
  destroyCmsg(pMsg);
}
U
ubuntu 已提交
1386

dengyihao's avatar
dengyihao 已提交
1387 1388
SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
  STransConnCtx* pCtx = (*pMsg)->ctx;
dengyihao's avatar
dengyihao 已提交
1389 1390
  SCliConn*      conn = NULL;

dengyihao's avatar
dengyihao 已提交
1391
  int64_t refId = (int64_t)((*pMsg)->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1392
  if (refId != 0) {
dengyihao's avatar
dengyihao 已提交
1393
    SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1394
    if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1395
      tError("failed to get conn, refId: %" PRId64 "", refId);
dengyihao's avatar
dengyihao 已提交
1396 1397
      *ignore = true;
      return NULL;
dengyihao's avatar
dengyihao 已提交
1398 1399
    } else {
      conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1400
      if (conn == NULL) {
dengyihao's avatar
dengyihao 已提交
1401
        conn = getConnFromPool2(pThrd, addr, pMsg);
dengyihao's avatar
dengyihao 已提交
1402
        if (conn != NULL) specifyConnRef(conn, true, refId);
dengyihao's avatar
dengyihao 已提交
1403
      }
dengyihao's avatar
dengyihao 已提交
1404
      transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1405 1406 1407
    }
    return conn;
  };
dengyihao's avatar
dengyihao 已提交
1408

dengyihao's avatar
dengyihao 已提交
1409
  conn = getConnFromPool2(pThrd, addr, pMsg);
dengyihao's avatar
dengyihao 已提交
1410
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1411
    tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1412
  } else {
dengyihao's avatar
dengyihao 已提交
1413
    tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1414
  }
dengyihao's avatar
dengyihao 已提交
1415 1416
  return conn;
}
dengyihao's avatar
dengyihao 已提交
1417
FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
dengyihao's avatar
dengyihao 已提交
1418 1419 1420
  if (pCvtAddr->cvt == false) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1421 1422 1423
  if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
    memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN);
    memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
dengyihao's avatar
dengyihao 已提交
1424 1425
  }
}
dengyihao's avatar
dengyihao 已提交
1426

dengyihao's avatar
dengyihao 已提交
1427
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
dengyihao's avatar
dengyihao 已提交
1428
  if (code != 0) return false;
dengyihao's avatar
dengyihao 已提交
1429
  // if (pCtx->retryCnt == 0) return false;
dengyihao's avatar
dengyihao 已提交
1430 1431 1432
  if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
  return true;
}
dengyihao's avatar
dengyihao 已提交
1433
FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
dengyihao's avatar
dengyihao 已提交
1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444
  if (pMsg == NULL) return -1;

  memset(pResp, 0, sizeof(STransMsg));

  pResp->code = TSDB_CODE_RPC_BROKEN_LINK;
  pResp->msgType = pMsg->msg.msgType + 1;
  pResp->info.ahandle = pMsg->ctx ? pMsg->ctx->ahandle : NULL;
  pResp->info.traceId = pMsg->msg.info.traceId;

  return 0;
}
dengyihao's avatar
dengyihao 已提交
1445 1446 1447 1448 1449
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
  uint32_t  addr = 0;
  uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
  if (v == NULL) {
    addr = taosGetIpv4FromFqdn(fqdn);
1450 1451 1452 1453
    if (addr == 0xffffffff) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr());
      return addr;
dengyihao's avatar
dengyihao 已提交
1454 1455
    }

dengyihao's avatar
dengyihao 已提交
1456 1457 1458 1459 1460 1461 1462 1463 1464 1465
    taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
  } else {
    addr = *v;
  }
  return addr;
}
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
  // impl later
  return;
}
dengyihao's avatar
dengyihao 已提交
1466

dengyihao's avatar
dengyihao 已提交
1467 1468 1469 1470 1471
static void doFreeTimeoutMsg(void* param) {
  STaskArg* arg = param;
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1472

dengyihao's avatar
dengyihao 已提交
1473 1474 1475 1476 1477 1478
  QUEUE_REMOVE(&pMsg->q);
  STraceId* trace = &pMsg->msg.info.traceId;
  tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
  doNotifyApp(pMsg, pThrd);
  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
1479
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1480
  STrans* pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1481

dengyihao's avatar
dengyihao 已提交
1482 1483
  cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
  if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
dengyihao's avatar
dengyihao 已提交
1484
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1485 1486
    return;
  }
dengyihao's avatar
dengyihao 已提交
1487

dengyihao's avatar
dengyihao 已提交
1488 1489
  char*    fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
  uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
dengyihao's avatar
dengyihao 已提交
1490 1491
  char     addr[TSDB_FQDN_LEN + 64] = {0};
  CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port);
dengyihao's avatar
dengyihao 已提交
1492

dengyihao's avatar
dengyihao 已提交
1493
  bool      ignore = false;
dengyihao's avatar
dengyihao 已提交
1494
  SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr);
dengyihao's avatar
dengyihao 已提交
1495
  if (ignore == true) {
dengyihao's avatar
dengyihao 已提交
1496
    // persist conn already release by server
dengyihao's avatar
dengyihao 已提交
1497 1498
    STransMsg resp;
    cliBuildExceptResp(pMsg, &resp);
dengyihao's avatar
dengyihao 已提交
1499 1500 1501
    if (pMsg->type != Release) {
      pTransInst->cfp(pTransInst->parent, &resp, NULL);
    }
dengyihao's avatar
dengyihao 已提交
1502
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1503 1504
    return;
  }
dengyihao's avatar
dengyihao 已提交
1505
  if (conn == NULL && pMsg == NULL) {
dengyihao's avatar
dengyihao 已提交
1506 1507
    return;
  }
dengyihao's avatar
dengyihao 已提交
1508
  STraceId* trace = &pMsg->msg.info.traceId;
dengyihao's avatar
dengyihao 已提交
1509

dengyihao's avatar
dengyihao 已提交
1510
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1511
    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
dengyihao's avatar
dengyihao 已提交
1512
    transQueuePush(&conn->cliMsgs, pMsg);
U
ubuntu 已提交
1513
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1514
  } else {
U
ubuntu 已提交
1515
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
1516 1517 1518 1519

    int64_t refId = (int64_t)pMsg->msg.info.handle;
    if (refId != 0) specifyConnRef(conn, true, refId);

dengyihao's avatar
dengyihao 已提交
1520
    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
dengyihao's avatar
dengyihao 已提交
1521
    transQueuePush(&conn->cliMsgs, pMsg);
dengyihao's avatar
dengyihao 已提交
1522

dengyihao's avatar
dengyihao 已提交
1523
    conn->ip = taosStrdup(addr);
dengyihao's avatar
dengyihao 已提交
1524

dengyihao's avatar
dengyihao 已提交
1525
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
1526 1527 1528 1529 1530 1531 1532 1533 1534
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1535

dengyihao's avatar
dengyihao 已提交
1536
    struct sockaddr_in addr;
1537
    addr.sin_family = AF_INET;
1538
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1539
    addr.sin_port = (uint16_t)htons(port);
dengyihao's avatar
dengyihao 已提交
1540

dengyihao's avatar
dengyihao 已提交
1541
    tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip);
1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
    if (fd == -1) {
      tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
              tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleExcept(conn);
      errno = 0;
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
      tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
    ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret != 0) {
      tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1562

1563
    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
dengyihao's avatar
dengyihao 已提交
1564
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1565 1566 1567 1568 1569
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1570
      cliHandleFastFail(conn, ret);
dengyihao's avatar
dengyihao 已提交
1571 1572
      return;
    }
dengyihao's avatar
dengyihao 已提交
1573
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
dengyihao's avatar
dengyihao 已提交
1574
  }
dengyihao's avatar
dengyihao 已提交
1575
  tGTrace("%s conn %p ready", pTransInst->label, conn);
dengyihao's avatar
dengyihao 已提交
1576
}
dengyihao's avatar
dengyihao 已提交
1577

dengyihao's avatar
dengyihao 已提交
1578
static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1579
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1580

dengyihao's avatar
dengyihao 已提交
1581 1582
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1583 1584 1585
    QUEUE_REMOVE(h);

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1586 1587 1588 1589 1590

    if (pMsg->type == Quit) {
      pThrd->stopMsg = pMsg;
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1591 1592 1593 1594 1595 1596 1597 1598
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);

    count++;
  }
  if (count >= 2) {
    tTrace("cli process batch size:%d", count);
  }
}
dengyihao's avatar
dengyihao 已提交
1599
SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
dengyihao's avatar
dengyihao 已提交
1600
  if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) {
dengyihao's avatar
dengyihao 已提交
1601 1602 1603 1604
    return NULL;
  }
  queue* hr = QUEUE_HEAD(&pList->wq);
  QUEUE_REMOVE(hr);
dengyihao's avatar
dengyihao 已提交
1605
  pList->sending += 1;
dengyihao's avatar
dengyihao 已提交
1606 1607 1608 1609 1610 1611

  pList->len -= 1;

  SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
  return batch;
}
dengyihao's avatar
dengyihao 已提交
1612

dengyihao's avatar
dengyihao 已提交
1613
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1614 1615
  STrans* pInst = pThrd->pTransInst;

dengyihao's avatar
dengyihao 已提交
1616
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1617 1618
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1619
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
1620 1621

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1622 1623 1624 1625 1626 1627

    if (pMsg->type == Quit) {
      pThrd->stopMsg = pMsg;
      continue;
    }

dengyihao's avatar
dengyihao 已提交
1628
    if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
1629 1630 1631 1632 1633 1634 1635
      STransConnCtx* pCtx = pMsg->ctx;

      char*    ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
      uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
      char     key[TSDB_FQDN_LEN + 64] = {0};
      CONN_CONSTRUCT_HASH_KEY(key, ip, port);

dengyihao's avatar
dengyihao 已提交
1636 1637 1638 1639 1640
      // SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key));
      SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key));
      if (ppBatchList == NULL || *ppBatchList == NULL) {
        SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
        QUEUE_INIT(&pBatchList->wq);
dengyihao's avatar
dengyihao 已提交
1641
        pBatchList->connMax = pInst->connLimitNum;
dengyihao's avatar
dengyihao 已提交
1642
        pBatchList->connCnt = 0;
dengyihao's avatar
dengyihao 已提交
1643
        pBatchList->batchLenLimit = pInst->batchSize;
dengyihao's avatar
dengyihao 已提交
1644
        pBatchList->len += 1;
dengyihao's avatar
dengyihao 已提交
1645

1646 1647
        pBatchList->ip = taosStrdup(ip);
        pBatchList->dst = taosStrdup(key);
dengyihao's avatar
dengyihao 已提交
1648 1649
        pBatchList->port = port;

dengyihao's avatar
dengyihao 已提交
1650 1651
        SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
        QUEUE_INIT(&pBatch->wq);
dengyihao's avatar
dengyihao 已提交
1652 1653
        QUEUE_INIT(&pBatch->listq);

dengyihao's avatar
dengyihao 已提交
1654 1655 1656
        QUEUE_PUSH(&pBatch->wq, h);
        pBatch->wLen += 1;
        pBatch->batchSize += pMsg->msg.contLen;
dengyihao's avatar
dengyihao 已提交
1657
        pBatch->pList = pBatchList;
dengyihao's avatar
dengyihao 已提交
1658

dengyihao's avatar
dengyihao 已提交
1659
        QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
dengyihao's avatar
dengyihao 已提交
1660

dengyihao's avatar
dengyihao 已提交
1661
        taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
1662
      } else {
dengyihao's avatar
dengyihao 已提交
1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683
        if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
          SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
          QUEUE_INIT(&pBatch->wq);
          QUEUE_INIT(&pBatch->listq);

          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->wLen += 1;
          pBatch->batchSize = pMsg->msg.contLen;
          pBatch->pList = *ppBatchList;

          QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
          (*ppBatchList)->len += 1;

          continue;
        }

        queue*     hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
        SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
        if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) {
          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->batchSize += pMsg->msg.contLen;
dengyihao's avatar
dengyihao 已提交
1684
          pBatch->wLen += 1;
dengyihao's avatar
dengyihao 已提交
1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697
        } else {
          SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
          QUEUE_INIT(&pBatch->wq);
          QUEUE_INIT(&pBatch->listq);

          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->wLen += 1;
          pBatch->batchSize += pMsg->msg.contLen;
          pBatch->pList = *ppBatchList;

          QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
          (*ppBatchList)->len += 1;
        }
dengyihao's avatar
dengyihao 已提交
1698
      }
dengyihao's avatar
dengyihao 已提交
1699
      continue;
dengyihao's avatar
dengyihao 已提交
1700
    }
dengyihao's avatar
dengyihao 已提交
1701
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1702
    count++;
dengyihao's avatar
dengyihao 已提交
1703
  }
dengyihao's avatar
dengyihao 已提交
1704

dengyihao's avatar
dengyihao 已提交
1705
  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
dengyihao's avatar
dengyihao 已提交
1706
  while (pIter != NULL) {
dengyihao's avatar
dengyihao 已提交
1707
    SCliBatchList* batchList = (SCliBatchList*)(*pIter);
dengyihao's avatar
dengyihao 已提交
1708 1709 1710
    SCliBatch*     batch = cliGetHeadFromList(batchList);
    if (batch != NULL) {
      cliHandleBatchReq(batch, pThrd);
dengyihao's avatar
dengyihao 已提交
1711
    }
dengyihao's avatar
dengyihao 已提交
1712
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
dengyihao's avatar
dengyihao 已提交
1713 1714
  }

dengyihao's avatar
dengyihao 已提交
1715
  if (count >= 2) {
S
Shengliang Guan 已提交
1716
    tTrace("cli process batch size:%d", count);
dengyihao's avatar
dengyihao 已提交
1717
  }
dengyihao's avatar
dengyihao 已提交
1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730
}

static void cliAsyncCb(uv_async_t* handle) {
  SAsyncItem* item = handle->data;
  SCliThrd*   pThrd = item->pThrd;
  STrans*     pTransInst = pThrd->pTransInst;

  // batch process to avoid to lock/unlock frequently
  queue wq;
  taosThreadMutexLock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  taosThreadMutexUnlock(&item->mtx);

dengyihao's avatar
dengyihao 已提交
1731
  int8_t supportBatch = pTransInst->supportBatch;
dengyihao's avatar
dengyihao 已提交
1732
  if (supportBatch == 0) {
dengyihao's avatar
dengyihao 已提交
1733
    cliNoBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1734
  } else if (supportBatch == 1) {
dengyihao's avatar
dengyihao 已提交
1735
    cliBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1736
  }
dengyihao's avatar
dengyihao 已提交
1737

dengyihao's avatar
dengyihao 已提交
1738
  if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1739
}
dengyihao's avatar
dengyihao 已提交
1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765
static void cliPrepareCb(uv_prepare_t* handle) {
  SCliThrd* thrd = handle->data;
  tTrace("prepare work start");

  SAsyncPool* pool = thrd->asyncPool;
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;

    queue wq;
    taosThreadMutexLock(&item->mtx);
    QUEUE_MOVE(&item->qmsg, &wq);
    taosThreadMutexUnlock(&item->mtx);

    int count = 0;
    while (!QUEUE_IS_EMPTY(&wq)) {
      queue* h = QUEUE_HEAD(&wq);
      QUEUE_REMOVE(h);

      SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
      (*cliAsyncHandle[pMsg->type])(pMsg, thrd);
      count++;
    }
  }
  tTrace("prepare work end");
  if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
dengyihao's avatar
dengyihao 已提交
1766
}
dengyihao's avatar
dengyihao 已提交
1767

dengyihao's avatar
dengyihao 已提交
1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796
void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
  transCtxCleanup(&conn->ctx);
  cliReleaseUnfinishedMsg(conn);
  if (destroy == 1) {
    transQueueDestroy(&conn->cliMsgs);
  } else {
    transQueueClear(&conn->cliMsgs);
  }
}

void cliIteraConnMsgs(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* cmsg = transQueueGet(&conn->cliMsgs, i);
    if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) {
      continue;
    }

    STransMsg resp = {0};
    if (-1 == cliBuildExceptResp(cmsg, &resp)) {
      continue;
    }
    pTransInst->cfp(pTransInst->parent, &resp, NULL);

    cmsg->ctx->ahandle = NULL;
  }
}
dengyihao's avatar
dengyihao 已提交
1797 1798 1799
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
  if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
    uint64_t ahandle = pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
1800
    tDebug("ahandle = %" PRIu64 "", ahandle);
dengyihao's avatar
dengyihao 已提交
1801 1802
    SCliMsg* pMsg = NULL;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
dengyihao's avatar
dengyihao 已提交
1803

dengyihao's avatar
dengyihao 已提交
1804 1805
    transClearBuffer(&conn->readBuf);
    transFreeMsg(transContFromHead((char*)pHead));
dengyihao's avatar
dengyihao 已提交
1806 1807 1808 1809

    for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->cliMsgs); i++) {
      SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i);
      if (cliMsg->type == Release) {
dengyihao's avatar
dengyihao 已提交
1810
        ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req");
dengyihao's avatar
dengyihao 已提交
1811 1812
        return true;
      }
dengyihao's avatar
dengyihao 已提交
1813
    }
dengyihao's avatar
dengyihao 已提交
1814 1815 1816

    cliIteraConnMsgs(conn);

dengyihao's avatar
dengyihao 已提交
1817 1818
    tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1819

dengyihao's avatar
dengyihao 已提交
1820 1821 1822 1823 1824 1825
    addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
    return true;
  }
  return false;
}

U
ubuntu 已提交
1826
static void* cliWorkThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
1827
  SCliThrd* pThrd = (SCliThrd*)arg;
dengyihao's avatar
dengyihao 已提交
1828
  pThrd->pid = taosGetSelfPthreadId();
G
gccgdb1234 已提交
1829
  setThreadName("trans-cli-work");
dengyihao's avatar
dengyihao 已提交
1830
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
dengyihao's avatar
dengyihao 已提交
1831 1832

  tDebug("thread quit-thread:%08" PRId64, pThrd->pid);
1833
  return NULL;
dengyihao's avatar
dengyihao 已提交
1834 1835
}

U
ubuntu 已提交
1836
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
1837
  SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj));
dengyihao's avatar
dengyihao 已提交
1838

dengyihao's avatar
dengyihao 已提交
1839
  STrans* pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
1840
  memcpy(cli->label, label, TSDB_LABEL_LEN);
dengyihao's avatar
dengyihao 已提交
1841
  cli->numOfThreads = numOfThreads;
dengyihao's avatar
dengyihao 已提交
1842
  cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*));
dengyihao's avatar
dengyihao 已提交
1843 1844

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
1845
    SCliThrd* pThrd = createThrdObj(shandle);
dengyihao's avatar
dengyihao 已提交
1846 1847 1848 1849 1850
    if (pThrd == NULL) {
      return NULL;
    }

    int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
1851
    if (err == 0) {
S
Shengliang Guan 已提交
1852
      tDebug("success to create tranport-cli thread:%d", i);
dengyihao's avatar
dengyihao 已提交
1853
    }
dengyihao's avatar
dengyihao 已提交
1854
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
1855
  }
dengyihao's avatar
dengyihao 已提交
1856

dengyihao's avatar
dengyihao 已提交
1857 1858
  return cli;
}
dengyihao's avatar
dengyihao 已提交
1859

dengyihao's avatar
dengyihao 已提交
1860
static FORCE_INLINE void destroyUserdata(STransMsg* userdata) {
dengyihao's avatar
dengyihao 已提交
1861 1862 1863 1864 1865 1866
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
dengyihao's avatar
dengyihao 已提交
1867

dengyihao's avatar
dengyihao 已提交
1868
static FORCE_INLINE void destroyCmsg(void* arg) {
dengyihao's avatar
dengyihao 已提交
1869
  SCliMsg* pMsg = arg;
dengyihao's avatar
dengyihao 已提交
1870 1871 1872
  if (pMsg == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1873

dengyihao's avatar
dengyihao 已提交
1874 1875
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
wafwerar's avatar
wafwerar 已提交
1876
  taosMemoryFree(pMsg);
dengyihao's avatar
dengyihao 已提交
1877
}
dengyihao's avatar
dengyihao 已提交
1878

dengyihao's avatar
dengyihao 已提交
1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
  if (param == NULL) return;

  STaskArg* arg = param;
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;

  tDebug("destroy Ahandle A");
  if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
    tDebug("destroy Ahandle B");
    pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
  }
  tDebug("destroy Ahandle C");

  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
  taosMemoryFree(pMsg);
}

dengyihao's avatar
dengyihao 已提交
1898 1899 1900
static SCliThrd* createThrdObj(void* trans) {
  STrans* pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
1901
  SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd));
U
ubuntu 已提交
1902

dengyihao's avatar
dengyihao 已提交
1903
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
1904
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
1905

wafwerar's avatar
wafwerar 已提交
1906
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
1907 1908 1909 1910 1911 1912 1913 1914
  int err = uv_loop_init(pThrd->loop);
  if (err != 0) {
    tError("failed to init uv_loop, reason:%s", uv_err_name(err));
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1915 1916 1917 1918 1919
  if (pTransInst->supportBatch) {
    pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 4, pThrd, cliAsyncCb);
  } else {
    pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb);
  }
dengyihao's avatar
dengyihao 已提交
1920
  if (pThrd->asyncPool == NULL) {
dengyihao's avatar
ref log  
dengyihao 已提交
1921
    tError("failed to init async pool");
dengyihao's avatar
dengyihao 已提交
1922 1923 1924 1925 1926 1927
    uv_loop_close(pThrd->loop);
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1928 1929 1930 1931

  pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
  uv_prepare_init(pThrd->loop, pThrd->prepare);
  pThrd->prepare->data = pThrd;
dengyihao's avatar
dengyihao 已提交
1932
  // uv_prepare_start(pThrd->prepare, cliPrepareCb);
dengyihao's avatar
dengyihao 已提交
1933

dengyihao's avatar
dengyihao 已提交
1934
  int32_t timerSize = 64;
dengyihao's avatar
dengyihao 已提交
1935 1936 1937 1938 1939 1940 1941
  pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
  for (int i = 0; i < timerSize; i++) {
    uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    uv_timer_init(pThrd->loop, timer);
    taosArrayPush(pThrd->timerList, &timer);
  }

dengyihao's avatar
dengyihao 已提交
1942
  pThrd->pool = createConnPool(4);
dengyihao's avatar
dengyihao 已提交
1943
  transDQCreate(pThrd->loop, &pThrd->delayQueue);
dengyihao's avatar
dengyihao 已提交
1944

dengyihao's avatar
dengyihao 已提交
1945 1946
  transDQCreate(pThrd->loop, &pThrd->timeoutQueue);

dengyihao's avatar
dengyihao 已提交
1947 1948
  transDQCreate(pThrd->loop, &pThrd->waitConnQueue);

dengyihao's avatar
dengyihao 已提交
1949 1950 1951
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
  pThrd->pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
1952
  pThrd->destroyAhandleFp = pTransInst->destroyFp;
dengyihao's avatar
dengyihao 已提交
1953
  pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1954 1955
  pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);

dengyihao's avatar
dengyihao 已提交
1956
  pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1957

dengyihao's avatar
dengyihao 已提交
1958
  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
1959 1960
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
1961
static void destroyThrdObj(SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1962 1963 1964
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1965

wafwerar's avatar
wafwerar 已提交
1966
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
1967
  CLI_RELEASE_UV(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
1968
  taosThreadMutexDestroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
1969
  TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
dengyihao's avatar
dengyihao 已提交
1970
  transAsyncPoolDestroy(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
1971

dengyihao's avatar
dengyihao 已提交
1972
  transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
dengyihao's avatar
dengyihao 已提交
1973
  transDQDestroy(pThrd->timeoutQueue, NULL);
dengyihao's avatar
dengyihao 已提交
1974
  transDQDestroy(pThrd->waitConnQueue, NULL);
dengyihao's avatar
dengyihao 已提交
1975

dengyihao's avatar
dengyihao 已提交
1976
  tDebug("thread destroy %" PRId64, pThrd->pid);
dengyihao's avatar
dengyihao 已提交
1977 1978 1979 1980 1981
  for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
    uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
    taosMemoryFree(timer);
  }
  taosArrayDestroy(pThrd->timerList);
dengyihao's avatar
dengyihao 已提交
1982
  taosMemoryFree(pThrd->prepare);
wafwerar's avatar
wafwerar 已提交
1983
  taosMemoryFree(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
1984
  taosHashCleanup(pThrd->fqdn2ipCache);
dengyihao's avatar
dengyihao 已提交
1985
  taosHashCleanup(pThrd->failFastCache);
dengyihao's avatar
dengyihao 已提交
1986 1987 1988

  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
  while (pIter != NULL) {
dengyihao's avatar
dengyihao 已提交
1989 1990 1991 1992 1993 1994 1995 1996
    SCliBatchList* pBatchList = (SCliBatchList*)(*pIter);
    while (!QUEUE_IS_EMPTY(&pBatchList->wq)) {
      queue* h = QUEUE_HEAD(&pBatchList->wq);
      QUEUE_REMOVE(h);

      SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
      cliDestroyBatch(pBatch);
    }
dengyihao's avatar
dengyihao 已提交
1997 1998
    taosMemoryFree(pBatchList->ip);
    taosMemoryFree(pBatchList->dst);
dengyihao's avatar
dengyihao 已提交
1999
    taosMemoryFree(pBatchList);
dengyihao's avatar
dengyihao 已提交
2000

dengyihao's avatar
dengyihao 已提交
2001 2002
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
  }
dengyihao's avatar
dengyihao 已提交
2003
  taosHashCleanup(pThrd->batchCache);
wafwerar's avatar
wafwerar 已提交
2004
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
2005
}
dengyihao's avatar
dengyihao 已提交
2006

dengyihao's avatar
dengyihao 已提交
2007
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
2008
  //
wafwerar's avatar
wafwerar 已提交
2009
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
2010
}
dengyihao's avatar
dengyihao 已提交
2011

dengyihao's avatar
dengyihao 已提交
2012
void cliSendQuit(SCliThrd* thrd) {
dengyihao's avatar
dengyihao 已提交
2013
  // cli can stop gracefully
wafwerar's avatar
wafwerar 已提交
2014
  SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2015
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
2016
  transAsyncSend(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
2017
  atomic_store_8(&thrd->asyncPool->stop, 1);
dengyihao's avatar
dengyihao 已提交
2018
}
dengyihao's avatar
dengyihao 已提交
2019 2020
void cliWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
dengyihao's avatar
dengyihao 已提交
2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032
    if (uv_handle_get_type(handle) == UV_TIMER) {
      // SCliConn* pConn = handle->data;
      //  if (pConn != NULL && pConn->timer != NULL) {
      //    SCliThrd* pThrd = pConn->hostThrd;
      //    uv_timer_stop((uv_timer_t*)handle);
      //    handle->data = NULL;
      //    taosArrayPush(pThrd->timerList, &pConn->timer);
      //    pConn->timer = NULL;
      //  }
    } else {
      uv_read_stop((uv_stream_t*)handle);
    }
dengyihao's avatar
dengyihao 已提交
2033
    uv_close(handle, cliDestroy);
dengyihao's avatar
dengyihao 已提交
2034 2035
  }
}
dengyihao's avatar
dengyihao 已提交
2036

dengyihao's avatar
dengyihao 已提交
2037
FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
dengyihao's avatar
dengyihao 已提交
2038
  int32_t index = pTransInst->index;
dengyihao's avatar
dengyihao 已提交
2039 2040 2041
  if (pTransInst->numOfThreads == 0) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2042 2043 2044 2045
  /*
   * no lock, and to avoid CPU load imbalance, set limit pTransInst->numOfThreads * 2000;
   */
  if (pTransInst->index++ >= pTransInst->numOfThreads * 2000) {
U
ubuntu 已提交
2046 2047 2048 2049
    pTransInst->index = 0;
  }
  return index % pTransInst->numOfThreads;
}
dengyihao's avatar
dengyihao 已提交
2050
static FORCE_INLINE void doDelayTask(void* param) {
dengyihao's avatar
dengyihao 已提交
2051
  STaskArg* arg = param;
dengyihao's avatar
dengyihao 已提交
2052
  cliHandleReq((SCliMsg*)arg->param1, (SCliThrd*)arg->param2);
dengyihao's avatar
dengyihao 已提交
2053 2054
  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
2055

dengyihao's avatar
dengyihao 已提交
2056 2057 2058
static void doCloseIdleConn(void* param) {
  STaskArg* arg = param;
  SCliConn* conn = arg->param1;
dengyihao's avatar
dengyihao 已提交
2059
  tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
2060
  conn->task = NULL;
dengyihao's avatar
dengyihao 已提交
2061 2062
  cliDestroyConn(conn, true);
  taosMemoryFree(arg);
dengyihao's avatar
dengyihao 已提交
2063 2064
}

dengyihao's avatar
dengyihao 已提交
2065
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
2066
  STrans*        pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2067 2068
  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
2069 2070 2071 2072 2073 2074 2075
  if (rpcDebugFlag & DEBUG_DEBUG) {
    STraceId* trace = &pMsg->msg.info.traceId;
    char      tbuf[256] = {0};
    EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
    tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
            pCtx->retryStep, pCtx->retryNextInterval);
  }
dengyihao's avatar
dengyihao 已提交
2076 2077 2078 2079

  STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
  arg->param1 = pMsg;
  arg->param2 = pThrd;
dengyihao's avatar
dengyihao 已提交
2080 2081

  transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval);
dengyihao's avatar
dengyihao 已提交
2082
}
dengyihao's avatar
dengyihao 已提交
2083

dengyihao's avatar
dengyihao 已提交
2084
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
dengyihao's avatar
dengyihao 已提交
2085 2086 2087 2088
  if (*val != exp) {
    *val = newVal;
  }
}
dengyihao's avatar
dengyihao 已提交
2089

dengyihao's avatar
dengyihao 已提交
2090
FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
dengyihao's avatar
dengyihao 已提交
2091 2092 2093 2094 2095 2096
  if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
    return false;
  }
  // rebuild resp msg
  SEpSet epset;
  if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) {
dengyihao's avatar
dengyihao 已提交
2097 2098 2099 2100
    return false;
  }
  int32_t tlen = tSerializeSEpSet(NULL, 0, dst);

dengyihao's avatar
dengyihao 已提交
2101 2102 2103 2104 2105 2106 2107
  char*   buf = NULL;
  int32_t len = pResp->contLen - tlen;
  if (len != 0) {
    buf = rpcMallocCont(len);
    memcpy(buf, (char*)pResp->pCont + tlen, len);
  }
  rpcFreeCont(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2108 2109

  pResp->pCont = buf;
dengyihao's avatar
dengyihao 已提交
2110 2111
  pResp->contLen = len;

dengyihao's avatar
dengyihao 已提交
2112
  epsetAssign(dst, &epset);
dengyihao's avatar
dengyihao 已提交
2113 2114
  return true;
}
dengyihao's avatar
dengyihao 已提交
2115
bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
dengyihao's avatar
dengyihao 已提交
2116
  bool noDelay = true;
dengyihao's avatar
dengyihao 已提交
2117 2118 2119 2120 2121 2122 2123
  if (hasEpSet == false) {
    if (pResp->contLen == 0) {
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2124 2125 2126 2127 2128 2129 2130 2131 2132 2133
    } else if (pResp->contLen != 0) {
      SEpSet  epSet;
      int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
      if (valid < 0) {
        tDebug("get invalid epset, epset equal, continue");
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
2134
      } else {
dengyihao's avatar
dengyihao 已提交
2135 2136
        if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
          tDebug("epset not equal, retry new epset");
dengyihao's avatar
dengyihao 已提交
2137
          epsetAssign(&pCtx->epSet, &epSet);
dengyihao's avatar
dengyihao 已提交
2138 2139 2140 2141 2142 2143 2144 2145 2146
          noDelay = false;
        } else {
          if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
            noDelay = false;
          } else {
            tDebug("epset equal, continue");
            EPSET_FORWARD_INUSE(&pCtx->epSet);
          }
        }
dengyihao's avatar
dengyihao 已提交
2147
      }
dengyihao's avatar
dengyihao 已提交
2148 2149
    }
  } else {
dengyihao's avatar
dengyihao 已提交
2150
    SEpSet  epSet;
dengyihao's avatar
dengyihao 已提交
2151 2152
    int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
    if (valid < 0) {
dengyihao's avatar
dengyihao 已提交
2153 2154 2155 2156 2157 2158
      tDebug("get invalid epset, epset equal, continue");
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2159
    } else {
dengyihao's avatar
dengyihao 已提交
2160 2161
      if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
        tDebug("epset not equal, retry new epset");
dengyihao's avatar
dengyihao 已提交
2162
        epsetAssign(&pCtx->epSet, &epSet);
dengyihao's avatar
dengyihao 已提交
2163
        noDelay = false;
dengyihao's avatar
dengyihao 已提交
2164
      } else {
dengyihao's avatar
dengyihao 已提交
2165 2166 2167 2168 2169 2170
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          tDebug("epset equal, continue");
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
2171
      }
dengyihao's avatar
dengyihao 已提交
2172 2173 2174 2175 2176
    }
  }
  return noDelay;
}
bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
2177 2178
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2179

dengyihao's avatar
dengyihao 已提交
2180 2181
  STransConnCtx* pCtx = pMsg->ctx;
  int32_t        code = pResp->code;
dengyihao's avatar
dengyihao 已提交
2182

dengyihao's avatar
dengyihao 已提交
2183
  bool retry = pTransInst->retry != NULL ? pTransInst->retry(code, pResp->msgType - 1) : false;
dengyihao's avatar
dengyihao 已提交
2184 2185 2186
  if (retry == false) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
2187 2188 2189 2190 2191 2192 2193 2194

  if (!pCtx->retryInit) {
    pCtx->retryMinInterval = pTransInst->retryMinInterval;
    pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
    pCtx->retryStepFactor = pTransInst->retryStepFactor;
    pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet;
    pCtx->retryInitTimestamp = taosGetTimestampMs();
    pCtx->retryNextInterval = pCtx->retryMinInterval;
dengyihao's avatar
dengyihao 已提交
2195
    pCtx->retryStep = 0;
dengyihao's avatar
dengyihao 已提交
2196
    pCtx->retryInit = true;
dengyihao's avatar
dengyihao 已提交
2197
    pCtx->retryCode = TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
2198 2199 2200

    // already retry, not use handle specified by app;
    pMsg->msg.info.handle = 0;
dengyihao's avatar
dengyihao 已提交
2201
  }
dengyihao's avatar
dengyihao 已提交
2202

dengyihao's avatar
dengyihao 已提交
2203 2204
  if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
    return false;
dengyihao's avatar
dengyihao 已提交
2205
  }
dengyihao's avatar
dengyihao 已提交
2206

dengyihao's avatar
dengyihao 已提交
2207 2208 2209 2210 2211 2212
  // code, msgType

  // A:  epset,   leader, not self
  // B:  epset,   not know leader
  // C:  no epset, leader but not serivce

dengyihao's avatar
dengyihao 已提交
2213 2214
  bool noDelay = false;
  if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
dengyihao's avatar
dengyihao 已提交
2215
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2216
    noDelay = cliResetEpset(pCtx, pResp, false);
dengyihao's avatar
dengyihao 已提交
2217 2218
    transFreeMsg(pResp->pCont);
    transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
2219
  } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
2220 2221
             code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED ||
             code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
dengyihao's avatar
dengyihao 已提交
2222
             code == TSDB_CODE_APP_IS_STOPPING || code == TSDB_CODE_VND_STOPPED) {
dengyihao's avatar
dengyihao 已提交
2223
    tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2224
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
2225 2226
    transFreeMsg(pResp->pCont);
    addConnToPool(pThrd->pool, pConn);
dengyihao's avatar
dengyihao 已提交
2227
  } else if (code == TSDB_CODE_SYN_RESTORING) {
dengyihao's avatar
dengyihao 已提交
2228
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2229
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
2230 2231
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2232
  } else {
dengyihao's avatar
dengyihao 已提交
2233
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2234 2235 2236
    noDelay = cliResetEpset(pCtx, pResp, false);
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2237
  }
dengyihao's avatar
dengyihao 已提交
2238 2239 2240 2241
  if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) {
    // save one internal code
    pCtx->retryCode = code;
  }
dengyihao's avatar
dengyihao 已提交
2242

dengyihao's avatar
dengyihao 已提交
2243 2244 2245
  if (noDelay == false) {
    pCtx->epsetRetryCnt = 1;
    pCtx->retryStep++;
dengyihao's avatar
dengyihao 已提交
2246

dengyihao's avatar
dengyihao 已提交
2247 2248 2249 2250 2251 2252 2253 2254
    int64_t factor = pow(pCtx->retryStepFactor, pCtx->retryStep - 1);
    pCtx->retryNextInterval = factor * pCtx->retryMinInterval;
    if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
      pCtx->retryNextInterval = pCtx->retryMaxInterval;
    }
  } else {
    pCtx->retryNextInterval = 0;
    pCtx->epsetRetryCnt++;
dengyihao's avatar
dengyihao 已提交
2255
  }
dengyihao's avatar
dengyihao 已提交
2256

dengyihao's avatar
dengyihao 已提交
2257
  pMsg->sent = 0;
dengyihao's avatar
dengyihao 已提交
2258
  cliSchedMsgToNextNode(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
2259
  return true;
dengyihao's avatar
dengyihao 已提交
2260
}
dengyihao's avatar
dengyihao 已提交
2261
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
2262 2263
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2264

dengyihao's avatar
dengyihao 已提交
2265
  if (pMsg == NULL || pMsg->ctx == NULL) {
dengyihao's avatar
dengyihao 已提交
2266
    tTrace("%s conn %p handle resp", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
2267 2268 2269
    pTransInst->cfp(pTransInst->parent, pResp, NULL);
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
2270

dengyihao's avatar
dengyihao 已提交
2271
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
2272

dengyihao's avatar
dengyihao 已提交
2273 2274 2275
  bool retry = cliGenRetryRule(pConn, pResp, pMsg);
  if (retry == true) {
    return -1;
dengyihao's avatar
dengyihao 已提交
2276
  }
dengyihao's avatar
dengyihao 已提交
2277

dengyihao's avatar
dengyihao 已提交
2278 2279 2280
  if (pCtx->retryCode != TSDB_CODE_SUCCESS) {
    int32_t code = pResp->code;
    // return internal code app
dengyihao's avatar
dengyihao 已提交
2281 2282
    if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK ||
        code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
dengyihao's avatar
dengyihao 已提交
2283 2284 2285 2286
      pResp->code = pCtx->retryCode;
    }
  }

2287
  // check whole vnodes is offline on this vgroup
2288 2289
  if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps || pCtx->retryStep > 0) {
    if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
A
Alex Duan 已提交
2290
      pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
2291
    } else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) {
A
Alex Duan 已提交
2292
      pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK;
2293 2294 2295
    }
  }

dengyihao's avatar
dengyihao 已提交
2296 2297
  STraceId* trace = &pResp->info.traceId;
  bool      hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2298
  if (hasEpSet) {
dengyihao's avatar
dengyihao 已提交
2299 2300 2301 2302 2303
    if (rpcDebugFlag & DEBUG_TRACE) {
      char tbuf[256] = {0};
      EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
      tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
    }
dengyihao's avatar
dengyihao 已提交
2304
  }
dengyihao's avatar
dengyihao 已提交
2305

dengyihao's avatar
dengyihao 已提交
2306
  if (pCtx->pSem != NULL) {
dengyihao's avatar
dengyihao 已提交
2307
    tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
2308
    if (pCtx->pRsp == NULL) {
dengyihao's avatar
dengyihao 已提交
2309
      tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
2310 2311 2312
    } else {
      memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
    }
dengyihao's avatar
dengyihao 已提交
2313
    tsem_post(pCtx->pSem);
2314
    pCtx->pRsp = NULL;
dengyihao's avatar
dengyihao 已提交
2315
  } else {
dengyihao's avatar
dengyihao 已提交
2316
    tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
2317
    if (retry == false && hasEpSet == true) {
dengyihao's avatar
dengyihao 已提交
2318
      pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2319
    } else {
dengyihao's avatar
dengyihao 已提交
2320
      if (!cliIsEpsetUpdated(pResp->code, pCtx)) {
dengyihao's avatar
dengyihao 已提交
2321 2322 2323 2324
        pTransInst->cfp(pTransInst->parent, pResp, NULL);
      } else {
        pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2325
    }
dengyihao's avatar
dengyihao 已提交
2326
  }
dengyihao's avatar
dengyihao 已提交
2327
  return 0;
dengyihao's avatar
dengyihao 已提交
2328
}
U
ubuntu 已提交
2329 2330

void transCloseClient(void* arg) {
U
ubuntu 已提交
2331
  SCliObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
2332
  for (int i = 0; i < cli->numOfThreads; i++) {
U
ubuntu 已提交
2333
    cliSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2334
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2335
  }
wafwerar's avatar
wafwerar 已提交
2336 2337
  taosMemoryFree(cli->pThreadObj);
  taosMemoryFree(cli);
dengyihao's avatar
dengyihao 已提交
2338
}
dengyihao's avatar
dengyihao 已提交
2339 2340 2341 2342 2343
void transRefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2344
  tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2345 2346 2347 2348 2349 2350 2351
  UNUSED(ref);
}
void transUnrefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2352
  tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2353
  if (ref == 0) {
U
ubuntu 已提交
2354
    cliDestroyConn((SCliConn*)handle, true);
dengyihao's avatar
dengyihao 已提交
2355 2356
  }
}
dengyihao's avatar
dengyihao 已提交
2357
static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2358
  SCliThrd*  pThrd = NULL;
dengyihao's avatar
dengyihao 已提交
2359
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2360 2361 2362
  if (exh == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
2363

dengyihao's avatar
dengyihao 已提交
2364 2365 2366 2367 2368 2369
  if (exh->pThrd == NULL && trans != NULL) {
    int idx = cliRBChoseIdx(trans);
    if (idx < 0) return NULL;
    exh->pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }

dengyihao's avatar
dengyihao 已提交
2370
  pThrd = exh->pThrd;
dengyihao's avatar
dengyihao 已提交
2371
  transReleaseExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2372 2373
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
2374
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2375
  if (handle == 0) {
dengyihao's avatar
dengyihao 已提交
2376
    int idx = cliRBChoseIdx(trans);
dengyihao's avatar
dengyihao 已提交
2377
    if (idx < 0) return NULL;
dengyihao's avatar
dengyihao 已提交
2378 2379
    return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }
dengyihao's avatar
dengyihao 已提交
2380
  SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle);
D
dapan1121 已提交
2381
  return pThrd;
dengyihao's avatar
dengyihao 已提交
2382
}
dengyihao's avatar
dengyihao 已提交
2383
int transReleaseCliHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
2384 2385 2386
  int  idx = -1;
  bool valid = false;

dengyihao's avatar
dengyihao 已提交
2387
  SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
dengyihao's avatar
dengyihao 已提交
2388
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2389
    return -1;
dengyihao's avatar
dengyihao 已提交
2390
  }
dengyihao's avatar
dengyihao 已提交
2391

dengyihao's avatar
dengyihao 已提交
2392
  STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527};
dengyihao's avatar
rm code  
dengyihao 已提交
2393
  TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
dengyihao's avatar
dengyihao 已提交
2394

dengyihao's avatar
dengyihao 已提交
2395 2396 2397
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
  pCtx->ahandle = tmsg.info.ahandle;

dengyihao's avatar
dengyihao 已提交
2398
  SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2399
  cmsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
2400
  cmsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2401
  cmsg->type = Release;
dengyihao's avatar
dengyihao 已提交
2402
  cmsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2403

dengyihao's avatar
dengyihao 已提交
2404 2405 2406
  STraceId* trace = &tmsg.info.traceId;
  tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);

dengyihao's avatar
dengyihao 已提交
2407
  if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
dengyihao's avatar
dengyihao 已提交
2408
    destroyCmsg(cmsg);
dengyihao's avatar
dengyihao 已提交
2409 2410
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2411
  return 0;
dengyihao's avatar
dengyihao 已提交
2412
}
dengyihao's avatar
dengyihao 已提交
2413

dengyihao's avatar
dengyihao 已提交
2414
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
2415 2416 2417 2418 2419
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2420

dengyihao's avatar
dengyihao 已提交
2421
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
dengyihao's avatar
dengyihao 已提交
2422
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2423
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2424
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2425
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2426 2427
  }

dengyihao's avatar
dengyihao 已提交
2428
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
wafwerar's avatar
wafwerar 已提交
2429
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2430 2431 2432
  epsetAssign(&pCtx->epSet, pEpSet);
  epsetAssign(&pCtx->origEpSet, pEpSet);

S
Shengliang Guan 已提交
2433
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2434
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2435

H
Haojun Liao 已提交
2436
  if (ctx != NULL) pCtx->appCtx = *ctx;
dengyihao's avatar
dengyihao 已提交
2437

wafwerar's avatar
wafwerar 已提交
2438
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2439
  cliMsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2440
  cliMsg->msg = *pReq;
dengyihao's avatar
dengyihao 已提交
2441
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2442
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2443
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2444

dengyihao's avatar
dengyihao 已提交
2445
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2446 2447
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2448 2449
  if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2450
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2451 2452
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2453
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2454
  return 0;
dengyihao's avatar
dengyihao 已提交
2455
}
dengyihao's avatar
dengyihao 已提交
2456

dengyihao's avatar
dengyihao 已提交
2457
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
dengyihao's avatar
dengyihao 已提交
2458 2459 2460 2461 2462
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2463

dengyihao's avatar
dengyihao 已提交
2464 2465
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2466
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2467
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2468
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2469
  }
dengyihao's avatar
dengyihao 已提交
2470

dengyihao's avatar
dengyihao 已提交
2471 2472
  tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
  tsem_init(sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
2473

dengyihao's avatar
dengyihao 已提交
2474 2475
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());

wafwerar's avatar
wafwerar 已提交
2476
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2477 2478
  epsetAssign(&pCtx->epSet, pEpSet);
  epsetAssign(&pCtx->origEpSet, pEpSet);
S
Shengliang Guan 已提交
2479
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2480
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2481
  pCtx->pSem = sem;
dengyihao's avatar
dengyihao 已提交
2482 2483
  pCtx->pRsp = pRsp;

wafwerar's avatar
wafwerar 已提交
2484
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2485 2486 2487
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2488
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2489
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2490

dengyihao's avatar
dengyihao 已提交
2491
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2492 2493
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2494

dengyihao's avatar
dengyihao 已提交
2495 2496
  int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
  if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
2497
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2498
    goto _RETURN;
dengyihao's avatar
dengyihao 已提交
2499
  }
dengyihao's avatar
dengyihao 已提交
2500
  tsem_wait(sem);
dengyihao's avatar
dengyihao 已提交
2501 2502

_RETURN:
dengyihao's avatar
dengyihao 已提交
2503 2504
  tsem_destroy(sem);
  taosMemoryFree(sem);
dengyihao's avatar
dengyihao 已提交
2505
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2506
  return ret;
dengyihao's avatar
dengyihao 已提交
2507
}
dengyihao's avatar
dengyihao 已提交
2508 2509 2510
/*
 *
 **/
dengyihao's avatar
dengyihao 已提交
2511
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
dengyihao's avatar
dengyihao 已提交
2512
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2513 2514 2515
  if (pTransInst == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2516 2517 2518

  SCvtAddr cvtAddr = {0};
  if (ip != NULL && fqdn != NULL) {
dengyihao's avatar
dengyihao 已提交
2519 2520
    tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
    tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn));
dengyihao's avatar
dengyihao 已提交
2521 2522
    cvtAddr.cvt = true;
  }
dengyihao's avatar
dengyihao 已提交
2523 2524
  for (int i = 0; i < pTransInst->numOfThreads; i++) {
    STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2525
    pCtx->cvtAddr = cvtAddr;
dengyihao's avatar
dengyihao 已提交
2526 2527 2528 2529

    SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
    cliMsg->ctx = pCtx;
    cliMsg->type = Update;
dengyihao's avatar
dengyihao 已提交
2530
    cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2531

dengyihao's avatar
dengyihao 已提交
2532
    SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
S
Shengliang Guan 已提交
2533
    tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid);
dengyihao's avatar
dengyihao 已提交
2534

dengyihao's avatar
dengyihao 已提交
2535 2536
    if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) {
      destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2537
      transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2538 2539
      return -1;
    }
dengyihao's avatar
dengyihao 已提交
2540
  }
dengyihao's avatar
dengyihao 已提交
2541
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2542
  return 0;
dengyihao's avatar
dengyihao 已提交
2543
}
dengyihao's avatar
dengyihao 已提交
2544 2545 2546 2547 2548

int64_t transAllocHandle() {
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
  tDebug("pre alloc refId %" PRId64 "", exh->refId);
dengyihao's avatar
dengyihao 已提交
2549

dengyihao's avatar
dengyihao 已提交
2550 2551
  return exh->refId;
}