transCli.c 80.5 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
dengyihao's avatar
dengyihao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#include "transComm.h"

dengyihao's avatar
dengyihao 已提交
16 17 18 19 20
typedef struct {
  int32_t numOfConn;
  queue   msgQ;
} SMsgList;

dengyihao's avatar
dengyihao 已提交
21
typedef struct SConnList {
dengyihao's avatar
dengyihao 已提交
22 23 24
  queue     conns;
  int32_t   size;
  SMsgList* list;
dengyihao's avatar
dengyihao 已提交
25 26
} SConnList;

dengyihao's avatar
dengyihao 已提交
27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28 29 30 31 32 33
  queue   wq;
  int32_t len;

  int connMax;
  int connCnt;
  int batchLenLimit;
dengyihao's avatar
dengyihao 已提交
34
  int sending;
dengyihao's avatar
dengyihao 已提交
35

dengyihao's avatar
dengyihao 已提交
36 37 38
  char*    dst;
  char*    ip;
  uint16_t port;
dengyihao's avatar
dengyihao 已提交
39 40 41 42 43 44 45 46 47 48

} SCliBatchList;

typedef struct {
  queue          wq;
  queue          listq;
  int32_t        wLen;
  int32_t        batchSize;  //
  int32_t        batch;
  SCliBatchList* pList;
dengyihao's avatar
dengyihao 已提交
49
} SCliBatch;
dengyihao's avatar
dengyihao 已提交
50

dengyihao's avatar
dengyihao 已提交
51
typedef struct SCliConn {
dengyihao's avatar
dengyihao 已提交
52
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
53 54
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
55
  queue        wreqQueue;
dengyihao's avatar
dengyihao 已提交
56 57

  uv_timer_t* timer;  // read timer, forbidden
dengyihao's avatar
dengyihao 已提交
58

dengyihao's avatar
dengyihao 已提交
59 60 61 62
  void* hostThrd;

  SConnBuffer readBuf;
  STransQueue cliMsgs;
dengyihao's avatar
dengyihao 已提交
63 64 65

  queue      q;
  SConnList* list;
dengyihao's avatar
dengyihao 已提交
66 67

  STransCtx  ctx;
dengyihao's avatar
dengyihao 已提交
68 69
  bool       broken;  // link broken or not
  ConnStatus status;  //
dengyihao's avatar
dengyihao 已提交
70

dengyihao's avatar
dengyihao 已提交
71 72
  SCliBatch* pBatch;

dengyihao's avatar
dengyihao 已提交
73 74
  int64_t refId;
  char*   ip;
dengyihao's avatar
dengyihao 已提交
75

dengyihao's avatar
dengyihao 已提交
76
  SDelayTask* task;
dengyihao's avatar
dengyihao 已提交
77

dengyihao's avatar
dengyihao 已提交
78
  // debug and log info
dengyihao's avatar
dengyihao 已提交
79 80 81
  char src[32];
  char dst[32];

dengyihao's avatar
dengyihao 已提交
82
} SCliConn;
dengyihao's avatar
dengyihao 已提交
83

dengyihao's avatar
dengyihao 已提交
84
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
85
  STransConnCtx* ctx;
dengyihao's avatar
formate  
dengyihao 已提交
86
  STransMsg      msg;
dengyihao's avatar
dengyihao 已提交
87
  queue          q;
dengyihao's avatar
dengyihao 已提交
88
  STransMsgType  type;
dengyihao's avatar
dengyihao 已提交
89

dengyihao's avatar
dengyihao 已提交
90
  int64_t  refId;
dengyihao's avatar
dengyihao 已提交
91 92
  uint64_t st;
  int      sent;  //(0: no send, 1: alread sent)
dengyihao's avatar
dengyihao 已提交
93 94
} SCliMsg;

dengyihao's avatar
dengyihao 已提交
95
typedef struct SCliThrd {
dengyihao's avatar
dengyihao 已提交
96 97 98 99 100 101
  TdThread      thread;  // tid
  int64_t       pid;     // pid
  uv_loop_t*    loop;
  SAsyncPool*   asyncPool;
  uv_prepare_t* prepare;
  void*         pool;  // conn pool
dengyihao's avatar
dengyihao 已提交
102
  // timer handles
dengyihao's avatar
dengyihao 已提交
103
  SArray* timerList;
dengyihao's avatar
dengyihao 已提交
104
  // msg queue
dengyihao's avatar
dengyihao 已提交
105
  queue         msg;
wafwerar's avatar
wafwerar 已提交
106
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
107
  SDelayQueue*  delayQueue;
dengyihao's avatar
dengyihao 已提交
108
  SDelayQueue*  timeoutQueue;
dengyihao's avatar
dengyihao 已提交
109
  SDelayQueue*  waitConnQueue;
dengyihao's avatar
dengyihao 已提交
110 111
  uint64_t      nextTimeout;  // next timeout
  void*         pTransInst;   //
dengyihao's avatar
dengyihao 已提交
112

dengyihao's avatar
dengyihao 已提交
113
  int connCount;
dengyihao's avatar
dengyihao 已提交
114
  void (*destroyAhandleFp)(void* ahandle);
dengyihao's avatar
dengyihao 已提交
115 116
  SHashObj* fqdn2ipCache;
  SCvtAddr  cvtAddr;
dengyihao's avatar
dengyihao 已提交
117

dengyihao's avatar
dengyihao 已提交
118
  SHashObj* failFastCache;
dengyihao's avatar
dengyihao 已提交
119
  SHashObj* batchCache;
dengyihao's avatar
dengyihao 已提交
120

dengyihao's avatar
dengyihao 已提交
121 122
  SCliMsg* stopMsg;

dengyihao's avatar
dengyihao 已提交
123
  bool quit;
dengyihao's avatar
dengyihao 已提交
124
} SCliThrd;
dengyihao's avatar
dengyihao 已提交
125

U
ubuntu 已提交
126
typedef struct SCliObj {
dengyihao's avatar
dengyihao 已提交
127 128 129 130
  char       label[TSDB_LABEL_LEN];
  int32_t    index;
  int        numOfThreads;
  SCliThrd** pThreadObj;
U
ubuntu 已提交
131
} SCliObj;
dengyihao's avatar
dengyihao 已提交
132

dengyihao's avatar
dengyihao 已提交
133 134 135 136 137 138 139
typedef struct {
  int32_t reinit;
  int64_t timestamp;
  int32_t count;
  int32_t threshold;
  int64_t interval;
} SFailFastItem;
dengyihao's avatar
dengyihao 已提交
140
// conn pool
dengyihao's avatar
dengyihao 已提交
141
// add expire timeout and capacity limit
dengyihao's avatar
dengyihao 已提交
142
static void*     createConnPool(int size);
dengyihao's avatar
dengyihao 已提交
143
static void*     destroyConnPool(SCliThrd* thread);
dengyihao's avatar
dengyihao 已提交
144
static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed);
dengyihao's avatar
dengyihao 已提交
145
static void      addConnToPool(void* pool, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
146
static void      doCloseIdleConn(void* param);
dengyihao's avatar
dengyihao 已提交
147

dengyihao's avatar
dengyihao 已提交
148 149
// register conn timer
static void cliConnTimeout(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
150 151
// register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
152
// register timer in each thread to clear expire conn
dengyihao's avatar
dengyihao 已提交
153
// static void cliTimeoutCb(uv_timer_t* handle);
dengyihao's avatar
dengyihao 已提交
154
// alloc buffer for recv
dengyihao's avatar
dengyihao 已提交
155
static FORCE_INLINE void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
156
// callback after recv nbytes from socket
U
ubuntu 已提交
157
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
158
// callback after send data to socket
U
ubuntu 已提交
159
static void cliSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
160
// callback after conn to server
U
ubuntu 已提交
161 162
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
163
static void cliIdleCb(uv_idle_t* handle);
dengyihao's avatar
dengyihao 已提交
164
static void cliPrepareCb(uv_prepare_t* handle);
dengyihao's avatar
dengyihao 已提交
165

dengyihao's avatar
dengyihao 已提交
166
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
167
static void cliSendBatchCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
168 169

SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
dengyihao's avatar
dengyihao 已提交
170

dengyihao's avatar
dengyihao 已提交
171 172
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);

dengyihao's avatar
dengyihao 已提交
173
static int32_t allocConnRef(SCliConn* conn, bool update);
dengyihao's avatar
dengyihao 已提交
174

dengyihao's avatar
dengyihao 已提交
175
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
dengyihao's avatar
dengyihao 已提交
176

dengyihao's avatar
dengyihao 已提交
177
static SCliConn* cliCreateConn(SCliThrd* thrd);
U
ubuntu 已提交
178 179
static void      cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void      cliDestroy(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
180
static void      cliSend(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
181
static void      cliSendBatch(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
182
static void      cliDestroyConnMsgs(SCliConn* conn, bool destroy);
dengyihao's avatar
dengyihao 已提交
183

dengyihao's avatar
dengyihao 已提交
184 185
static void    doFreeTimeoutMsg(void* param);
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg);
dengyihao's avatar
dengyihao 已提交
186

dengyihao's avatar
dengyihao 已提交
187
// cli util func
dengyihao's avatar
dengyihao 已提交
188 189
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
dengyihao's avatar
dengyihao 已提交
190

dengyihao's avatar
dengyihao 已提交
191
static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
dengyihao's avatar
dengyihao 已提交
192

dengyihao's avatar
dengyihao 已提交
193 194 195
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn);
static FORCE_INLINE void     cliUpdateFqdnCache(SHashObj* cache, char* fqdn);

dengyihao's avatar
dengyihao 已提交
196
// process data read from server, add decompress etc later
U
ubuntu 已提交
197
static void cliHandleResp(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
198
// handle except about conn
U
ubuntu 已提交
199
static void cliHandleExcept(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
200
static void cliReleaseUnfinishedMsg(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
201
static void cliHandleFastFail(SCliConn* pConn, int status);
dengyihao's avatar
dengyihao 已提交
202

dengyihao's avatar
dengyihao 已提交
203
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd);
204
// handle req from app
dengyihao's avatar
dengyihao 已提交
205 206 207 208 209 210
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
                                                                   cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
211 212
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
/// NULL,cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
213

dengyihao's avatar
dengyihao 已提交
214 215
static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
static FORCE_INLINE void destroyCmsg(void* cmsg);
dengyihao's avatar
dengyihao 已提交
216
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
dengyihao's avatar
dengyihao 已提交
217
static FORCE_INLINE int  cliRBChoseIdx(STrans* pTransInst);
dengyihao's avatar
dengyihao 已提交
218
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
219

dengyihao's avatar
dengyihao 已提交
220
// thread obj
dengyihao's avatar
dengyihao 已提交
221
static SCliThrd* createThrdObj(void* trans);
dengyihao's avatar
dengyihao 已提交
222
static void      destroyThrdObj(SCliThrd* pThrd);
dengyihao's avatar
dengyihao 已提交
223

dengyihao's avatar
dengyihao 已提交
224 225 226 227 228 229 230 231 232
static void cliWalkCb(uv_handle_t* handle, void* arg);

#define CLI_RELEASE_UV(loop)        \
  do {                              \
    uv_walk(loop, cliWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);   \
    uv_loop_close(loop);            \
  } while (0);

dengyihao's avatar
dengyihao 已提交
233
// snprintf may cause performance problem
dengyihao's avatar
dengyihao 已提交
234 235 236 237 238 239 240
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
  do {                                         \
    char*   p = key;                           \
    int32_t len = strlen(ip);                  \
    if (p != NULL) memcpy(p, ip, len);         \
    p[len] = ':';                              \
    titoa(port, 10, &p[len + 1]);              \
dengyihao's avatar
dengyihao 已提交
241 242
  } while (0)

dengyihao's avatar
dengyihao 已提交
243 244
#define CONN_PERSIST_TIME(para)   ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
dengyihao's avatar
dengyihao 已提交
245

dengyihao's avatar
dengyihao 已提交
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle)                         \
  do {                                                                    \
    int i = 0, sz = transQueueSize(&conn->cliMsgs);                       \
    for (; i < sz; i++) {                                                 \
      pMsg = transQueueGet(&conn->cliMsgs, i);                            \
      if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
        break;                                                            \
      }                                                                   \
    }                                                                     \
    if (i == sz) {                                                        \
      pMsg = NULL;                                                        \
      tDebug("msg not found, %" PRIu64 "", ahandle);                      \
    } else {                                                              \
      pMsg = transQueueRm(&conn->cliMsgs, i);                             \
      tDebug("msg found, %" PRIu64 "", ahandle);                          \
    }                                                                     \
dengyihao's avatar
dengyihao 已提交
262
  } while (0)
dengyihao's avatar
dengyihao 已提交
263

U
ubuntu 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276
#define CONN_GET_NEXT_SENDMSG(conn)                 \
  do {                                              \
    int i = 0;                                      \
    do {                                            \
      pCliMsg = transQueueGet(&conn->cliMsgs, i++); \
      if (pCliMsg && 0 == pCliMsg->sent) {          \
        break;                                      \
      }                                             \
    } while (pCliMsg != NULL);                      \
    if (pCliMsg == NULL) {                          \
      goto _RETURN;                                 \
    }                                               \
  } while (0)
dengyihao's avatar
dengyihao 已提交
277

dengyihao's avatar
formate  
dengyihao 已提交
278 279
#define CONN_SET_PERSIST_BY_APP(conn) \
  do {                                \
dengyihao's avatar
dengyihao 已提交
280 281
    if (conn->status == ConnNormal) { \
      conn->status = ConnAcquire;     \
dengyihao's avatar
formate  
dengyihao 已提交
282 283 284
      transRefCliHandle(conn);        \
    }                                 \
  } while (0)
285

dengyihao's avatar
dengyihao 已提交
286 287 288 289
#define CONN_NO_PERSIST_BY_APP(conn) \
  (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) \
  (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
290

S
Shengliang Guan 已提交
291 292
#define REQUEST_NO_RESP(msg)         ((msg)->info.noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg)   ((msg)->info.persistHandle == 1)
dengyihao's avatar
dengyihao 已提交
293
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
dengyihao's avatar
dengyihao 已提交
294

dengyihao's avatar
dengyihao 已提交
295
#define EPSET_IS_VALID(epSet)       ((epSet) != NULL && (epSet)->numOfEps >= 0 && (epSet)->inUse >= 0)
dengyihao's avatar
dengyihao 已提交
296
#define EPSET_GET_SIZE(epSet)       (epSet)->numOfEps
dengyihao's avatar
dengyihao 已提交
297
#define EPSET_GET_INUSE_IP(epSet)   ((epSet)->eps[(epSet)->inUse].fqdn)
dengyihao's avatar
dengyihao 已提交
298
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
dengyihao's avatar
dengyihao 已提交
299 300 301 302 303 304
#define EPSET_FORWARD_INUSE(epSet)                             \
  do {                                                         \
    if ((epSet)->numOfEps != 0) {                              \
      ++((epSet)->inUse);                                      \
      (epSet)->inUse = ((epSet)->inUse) % ((epSet)->numOfEps); \
    }                                                          \
dengyihao's avatar
dengyihao 已提交
305
  } while (0)
dengyihao's avatar
dengyihao 已提交
306

dengyihao's avatar
dengyihao 已提交
307 308 309 310 311 312 313 314 315 316 317
#define EPSET_DEBUG_STR(epSet, tbuf)                                                                                   \
  do {                                                                                                                 \
    int len = snprintf(tbuf, sizeof(tbuf), "epset:{");                                                                 \
    for (int i = 0; i < (epSet)->numOfEps; i++) {                                                                      \
      if (i == (epSet)->numOfEps - 1) {                                                                                \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port);   \
      } else {                                                                                                         \
        len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
      }                                                                                                                \
    }                                                                                                                  \
    len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse);                                    \
dengyihao's avatar
dengyihao 已提交
318
  } while (0);
dengyihao's avatar
dengyihao 已提交
319

U
ubuntu 已提交
320
static void* cliWorkThread(void* arg);
dengyihao's avatar
dengyihao 已提交
321

dengyihao's avatar
dengyihao 已提交
322 323 324 325 326 327 328 329
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
    if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
      if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
        conn->ctx.freeFunc(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
330
      } else if (msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) {
dengyihao's avatar
dengyihao 已提交
331
        tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
332
        pThrd->destroyAhandleFp(msg->ctx->ahandle);
dengyihao's avatar
dengyihao 已提交
333 334 335 336
      }
    }
    destroyCmsg(msg);
  }
dengyihao's avatar
dengyihao 已提交
337
  transQueueClear(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
338
  memset(&conn->ctx, 0, sizeof(conn->ctx));
dengyihao's avatar
dengyihao 已提交
339
}
dengyihao's avatar
dengyihao 已提交
340
bool cliMaySendCachedMsg(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
341
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
342
    SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
343
    CONN_GET_NEXT_SENDMSG(conn);
dengyihao's avatar
dengyihao 已提交
344 345
    cliSend(conn);
    return true;
dengyihao's avatar
dengyihao 已提交
346
  }
dengyihao's avatar
dengyihao 已提交
347
  return false;
U
ubuntu 已提交
348 349
_RETURN:
  return false;
dengyihao's avatar
dengyihao 已提交
350
}
dengyihao's avatar
formate  
dengyihao 已提交
351
void cliHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
352 353
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
354

dengyihao's avatar
dengyihao 已提交
355 356 357 358 359 360
  if (conn->timer) {
    if (uv_is_active((uv_handle_t*)conn->timer)) {
      tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
      uv_timer_stop(conn->timer);
    }
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
361
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
362
    conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
363 364
  }

dengyihao's avatar
opt rpc  
dengyihao 已提交
365
  STransMsgHead* pHead = NULL;
dengyihao's avatar
dengyihao 已提交
366 367 368

  int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
  if (msgLen <= 0) {
dengyihao's avatar
dengyihao 已提交
369 370 371
    tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
372 373 374 375

  if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
    tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
  }
dengyihao's avatar
dengyihao 已提交
376 377
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
378 379 380 381
  if (cliRecvReleaseReq(conn, pHead)) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
382 383 384 385 386
  STransMsg transMsg = {0};
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = transContFromHead((char*)pHead);
  transMsg.code = pHead->code;
  transMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
387
  transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
388
  transMsg.info.traceId = pHead->traceId;
dengyihao's avatar
dengyihao 已提交
389
  transMsg.info.hasEpSet = pHead->hasEpSet;
dengyihao's avatar
dengyihao 已提交
390

dengyihao's avatar
dengyihao 已提交
391 392 393 394
  SCliMsg*       pMsg = NULL;
  STransConnCtx* pCtx = NULL;
  if (CONN_NO_PERSIST_BY_APP(conn)) {
    pMsg = transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
395 396 397

    pCtx = pMsg ? pMsg->ctx : NULL;
    transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
398
    tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
U
ubuntu 已提交
399
  } else {
dengyihao's avatar
dengyihao 已提交
400 401 402
    uint64_t ahandle = (uint64_t)pHead->ahandle;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
    if (pMsg == NULL) {
S
Shengliang Guan 已提交
403
      transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
404 405
      tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
             transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
406
      if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
D
dapan1121 已提交
407
        transMsg.code = TSDB_CODE_RPC_BROKEN_LINK;
S
Shengliang Guan 已提交
408
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
dengyihao's avatar
dengyihao 已提交
409 410
        tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
411 412
      }
    } else {
dengyihao's avatar
dengyihao 已提交
413
      pCtx = pMsg->ctx;
S
Shengliang Guan 已提交
414
      transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
415
      tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
416
    }
U
ubuntu 已提交
417
  }
dengyihao's avatar
dengyihao 已提交
418
  // buf's mem alread translated to transMsg.pCont
dengyihao's avatar
dengyihao 已提交
419
  if (!CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
420
    transMsg.info.handle = (void*)conn->refId;
dengyihao's avatar
dengyihao 已提交
421
    tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
422
  }
dengyihao's avatar
dengyihao 已提交
423

dengyihao's avatar
dengyihao 已提交
424
  STraceId* trace = &transMsg.info.traceId;
dengyihao's avatar
dengyihao 已提交
425
  tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
dengyihao's avatar
dengyihao 已提交
426
          TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
dengyihao's avatar
dengyihao 已提交
427

dengyihao's avatar
dengyihao 已提交
428
  if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
429
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
430
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
431 432
    return;
  }
S
Shengliang Guan 已提交
433
  if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
434
    tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
435
    transFreeMsg(transMsg.pCont);
dengyihao's avatar
dengyihao 已提交
436 437
    return;
  }
dengyihao's avatar
dengyihao 已提交
438

dengyihao's avatar
dengyihao 已提交
439
  if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
440 441 442
    if (cliAppCb(conn, &transMsg, pMsg) != 0) {
      return;
    }
dengyihao's avatar
dengyihao 已提交
443
  }
dengyihao's avatar
dengyihao 已提交
444 445
  destroyCmsg(pMsg);

dengyihao's avatar
dengyihao 已提交
446
  if (cliMaySendCachedMsg(conn) == true) {
dengyihao's avatar
dengyihao 已提交
447 448
    return;
  }
dengyihao's avatar
dengyihao 已提交
449

U
ubuntu 已提交
450
  if (CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
451
    return addConnToPool(pThrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
452
  }
dengyihao's avatar
test  
dengyihao 已提交
453

dengyihao's avatar
dengyihao 已提交
454
  uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
455
}
U
ubuntu 已提交
456

dengyihao's avatar
dengyihao 已提交
457
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
dengyihao's avatar
dengyihao 已提交
458
  if (transQueueEmpty(&pConn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
459
    if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
dengyihao's avatar
dengyihao 已提交
460
      tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
formate  
dengyihao 已提交
461 462 463
      transUnrefCliHandle(pConn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
464
  }
dengyihao's avatar
dengyihao 已提交
465 466 467
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
  bool      once = false;
D
dapan1121 已提交
468
  do {
dengyihao's avatar
dengyihao 已提交
469
    SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
dengyihao's avatar
enh log  
dengyihao 已提交
470

D
dapan1121 已提交
471 472
    if (pMsg == NULL && once) {
      break;
dengyihao's avatar
dengyihao 已提交
473
    }
dengyihao's avatar
enh log  
dengyihao 已提交
474 475 476 477 478 479

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg)) {
      destroyCmsg(pMsg);
      break;
    }

dengyihao's avatar
dengyihao 已提交
480
    STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
U
ubuntu 已提交
481

dengyihao's avatar
dengyihao 已提交
482
    STransMsg transMsg = {0};
dengyihao's avatar
dengyihao 已提交
483
    transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
dengyihao's avatar
dengyihao 已提交
484
    transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
S
Shengliang Guan 已提交
485
    transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
486

dengyihao's avatar
dengyihao 已提交
487
    if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
S
Shengliang Guan 已提交
488
      transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
dengyihao's avatar
dengyihao 已提交
489
      tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
dengyihao's avatar
dengyihao 已提交
490
             TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
491
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
492 493 494
        int32_t msgType = 0;
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType);
        transMsg.msgType = msgType;
dengyihao's avatar
dengyihao 已提交
495
        tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
S
Shengliang Guan 已提交
496
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
497
      }
dengyihao's avatar
dengyihao 已提交
498
    } else {
dengyihao's avatar
dengyihao 已提交
499
      transMsg.info.ahandle = (pMsg != NULL && pMsg->type != Release && pCtx) ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
500 501 502
    }

    if (pCtx == NULL || pCtx->pSem == NULL) {
S
Shengliang Guan 已提交
503
      if (transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
504 505 506 507 508
        if (pMsg == NULL || REQUEST_NO_RESP(&pMsg->msg) || pMsg->type == Release) {
          destroyCmsg(pMsg);
          once = true;
          continue;
        }
U
ubuntu 已提交
509
      }
dengyihao's avatar
dengyihao 已提交
510
    }
dengyihao's avatar
enh log  
dengyihao 已提交
511

dengyihao's avatar
dengyihao 已提交
512
    if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
dengyihao's avatar
dengyihao 已提交
513 514 515
      if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
        return;
      }
dengyihao's avatar
dengyihao 已提交
516 517
    }
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
518
    tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn));
D
dapan1121 已提交
519
  } while (!transQueueEmpty(&pConn->cliMsgs));
dengyihao's avatar
dengyihao 已提交
520
  transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
521
}
dengyihao's avatar
dengyihao 已提交
522
void cliHandleExcept(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
523
  tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
524 525
  cliHandleExceptImpl(conn, -1);
}
dengyihao's avatar
dengyihao 已提交
526

dengyihao's avatar
dengyihao 已提交
527 528
void cliConnTimeout(uv_timer_t* handle) {
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
529 530
  SCliThrd* pThrd = conn->hostThrd;

dengyihao's avatar
dengyihao 已提交
531
  tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
532

dengyihao's avatar
dengyihao 已提交
533
  uv_timer_stop(handle);
dengyihao's avatar
dengyihao 已提交
534 535 536
  handle->data = NULL;
  taosArrayPush(pThrd->timerList, &conn->timer);
  conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
537 538

  cliHandleFastFail(conn, UV_ECANCELED);
dengyihao's avatar
dengyihao 已提交
539
}
dengyihao's avatar
dengyihao 已提交
540 541 542 543
void cliReadTimeoutCb(uv_timer_t* handle) {
  // set up timeout cb
  SCliConn* conn = handle->data;
  tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
544
  uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
545 546
  cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
}
U
ubuntu 已提交
547 548

void* createConnPool(int size) {
549 550
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
551
}
dengyihao's avatar
dengyihao 已提交
552 553
void* destroyConnPool(SCliThrd* pThrd) {
  void*      pool = pThrd->pool;
dengyihao's avatar
dengyihao 已提交
554
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
555
  while (connList != NULL) {
dengyihao's avatar
dengyihao 已提交
556 557
    while (!QUEUE_IS_EMPTY(&connList->conns)) {
      queue*    h = QUEUE_HEAD(&connList->conns);
dengyihao's avatar
dengyihao 已提交
558
      SCliConn* c = QUEUE_DATA(h, SCliConn, q);
U
ubuntu 已提交
559
      cliDestroyConn(c, true);
dengyihao's avatar
dengyihao 已提交
560
    }
dengyihao's avatar
dengyihao 已提交
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575

    SMsgList* msglist = connList->list;
    while (!QUEUE_IS_EMPTY(&msglist->msgQ)) {
      queue* h = QUEUE_HEAD(&msglist->msgQ);
      QUEUE_REMOVE(h);

      SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);

      transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
      pMsg->ctx->task = NULL;

      doNotifyApp(pMsg, pThrd);
    }
    taosMemoryFree(msglist);

dengyihao's avatar
dengyihao 已提交
576
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
577
  }
dengyihao's avatar
dengyihao 已提交
578
  taosHashCleanup(pool);
579
  return NULL;
dengyihao's avatar
dengyihao 已提交
580 581
}

dengyihao's avatar
dengyihao 已提交
582
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
dengyihao's avatar
dengyihao 已提交
583
  void*      pool = pThrd->pool;
dengyihao's avatar
dengyihao 已提交
584
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
585
  STrans*    pTranInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
586
  if (plist == NULL) {
dengyihao's avatar
dengyihao 已提交
587 588 589 590
    SConnList list = {0};
    taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pool, key, strlen(key));

dengyihao's avatar
dengyihao 已提交
591 592 593 594
    SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
    QUEUE_INIT(&nList->msgQ);
    nList->numOfConn++;

dengyihao's avatar
dengyihao 已提交
595 596
    QUEUE_INIT(&plist->conns);
    plist->list = nList;
dengyihao's avatar
dengyihao 已提交
597 598
  }

dengyihao's avatar
dengyihao 已提交
599 600 601 602
  if (QUEUE_IS_EMPTY(&plist->conns)) {
    if (plist->list->numOfConn >= pTranInst->connLimitNum) {
      *exceed = true;
    }
dengyihao's avatar
dengyihao 已提交
603 604 605
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
606
  queue* h = QUEUE_TAIL(&plist->conns);
dengyihao's avatar
dengyihao 已提交
607
  QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
608
  plist->size -= 1;
dengyihao's avatar
dengyihao 已提交
609

dengyihao's avatar
dengyihao 已提交
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
  conn->status = ConnNormal;
  QUEUE_INIT(&conn->q);

  if (conn->task != NULL) {
    transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
    conn->task = NULL;
  }
  return conn;
}

static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
  void*      pool = pThrd->pool;
  STrans*    pTransInst = pThrd->pTransInst;
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
  if (plist == NULL) {
dengyihao's avatar
dengyihao 已提交
626 627 628 629
    SConnList list = {0};
    taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pool, key, strlen(key));

dengyihao's avatar
dengyihao 已提交
630 631
    SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
    QUEUE_INIT(&nList->msgQ);
dengyihao's avatar
dengyihao 已提交
632 633
    nList->numOfConn++;

dengyihao's avatar
dengyihao 已提交
634 635
    QUEUE_INIT(&plist->conns);
    plist->list = nList;
dengyihao's avatar
dengyihao 已提交
636 637
  }

dengyihao's avatar
dengyihao 已提交
638
  STraceId* trace = &(*pMsg)->msg.info.traceId;
dengyihao's avatar
dengyihao 已提交
639
  // no avaliable conn in pool
dengyihao's avatar
dengyihao 已提交
640
  if (QUEUE_IS_EMPTY(&plist->conns)) {
dengyihao's avatar
dengyihao 已提交
641
    SMsgList* list = plist->list;
dengyihao's avatar
dengyihao 已提交
642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659
    if ((list)->numOfConn >= pTransInst->connLimitNum) {
      STraceId* trace = &(*pMsg)->msg.info.traceId;
      STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
      arg->param1 = *pMsg;
      arg->param2 = pThrd;
      (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);

      tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));

      QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
      *pMsg = NULL;
    } else {
      // send msg in delay queue
      if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
        STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
        arg->param1 = *pMsg;
        arg->param2 = pThrd;
        (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
dengyihao's avatar
dengyihao 已提交
660 661
        tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label,
                TMSG_INFO((*pMsg)->msg.msgType));
dengyihao's avatar
dengyihao 已提交
662 663 664 665 666 667 668

        QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
        queue* h = QUEUE_HEAD(&(list)->msgQ);
        QUEUE_REMOVE(h);
        SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q);

        *pMsg = ans;
dengyihao's avatar
dengyihao 已提交
669 670 671

        trace = &(*pMsg)->msg.info.traceId;
        tGTrace("%s msg %s pop from delay queue, start to send", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
dengyihao's avatar
dengyihao 已提交
672 673 674 675
        transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
      }
      list->numOfConn++;
    }
dengyihao's avatar
dengyihao 已提交
676 677
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
678

dengyihao's avatar
dengyihao 已提交
679
  queue* h = QUEUE_HEAD(&plist->conns);
dengyihao's avatar
dengyihao 已提交
680
  plist->size -= 1;
dengyihao's avatar
dengyihao 已提交
681 682
  QUEUE_REMOVE(h);

dengyihao's avatar
dengyihao 已提交
683
  SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
dengyihao's avatar
dengyihao 已提交
684
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
685
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
686

dengyihao's avatar
dengyihao 已提交
687 688 689 690
  if (conn->task != NULL) {
    transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
    conn->task = NULL;
  }
dengyihao's avatar
dengyihao 已提交
691
  return conn;
dengyihao's avatar
dengyihao 已提交
692
}
dengyihao's avatar
dengyihao 已提交
693 694 695 696 697 698
static void addConnToPool(void* pool, SCliConn* conn) {
  if (conn->status == ConnInPool) {
    return;
  }
  allocConnRef(conn, true);

dengyihao's avatar
dengyihao 已提交
699
  SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
700 701 702 703 704 705
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    taosArrayPush(thrd->timerList, &conn->timer);
    conn->timer->data = NULL;
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
706 707 708 709 710
  if (T_REF_VAL_GET(conn) > 1) {
    transUnrefCliHandle(conn);
  }

  cliDestroyConnMsgs(conn, false);
dengyihao's avatar
dengyihao 已提交
711

dengyihao's avatar
dengyihao 已提交
712
  if (conn->list == NULL) {
dengyihao's avatar
dengyihao 已提交
713
    conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
dengyihao's avatar
dengyihao 已提交
714
  }
dengyihao's avatar
dengyihao 已提交
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735

  SConnList* pList = conn->list;
  SMsgList*  msgList = pList->list;
  if (!QUEUE_IS_EMPTY(&msgList->msgQ)) {
    queue* h = QUEUE_HEAD(&(msgList)->msgQ);
    QUEUE_REMOVE(h);

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);

    transDQCancel(thrd->waitConnQueue, pMsg->ctx->task);
    pMsg->ctx->task = NULL;

    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
    transQueuePush(&conn->cliMsgs, pMsg);

    conn->status = ConnNormal;
    cliSend(conn);
    return;
  }

  conn->status = ConnInPool;
dengyihao's avatar
dengyihao 已提交
736
  QUEUE_PUSH(&conn->list->conns, &conn->q);
dengyihao's avatar
dengyihao 已提交
737
  conn->list->size += 1;
dengyihao's avatar
dengyihao 已提交
738

dengyihao's avatar
dengyihao 已提交
739
  if (conn->list->size >= 250) {
dengyihao's avatar
dengyihao 已提交
740 741
    STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
    arg->param1 = conn;
dengyihao's avatar
dengyihao 已提交
742
    arg->param2 = thrd;
dengyihao's avatar
dengyihao 已提交
743 744

    STrans* pTransInst = thrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
745 746
    conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
  }
dengyihao's avatar
dengyihao 已提交
747
}
dengyihao's avatar
dengyihao 已提交
748
static int32_t allocConnRef(SCliConn* conn, bool update) {
dengyihao's avatar
dengyihao 已提交
749
  if (update) {
dengyihao's avatar
dengyihao 已提交
750
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
751
    transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
752
    conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
753 754 755 756
  }
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
757
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
dengyihao's avatar
dengyihao 已提交
758
  conn->refId = exh->refId;
759 760 761 762

  if (conn->refId == -1) {
    taosMemoryFree(exh);
  }
dengyihao's avatar
dengyihao 已提交
763 764 765 766 767
  return 0;
}

static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
  if (update) {
dengyihao's avatar
dengyihao 已提交
768
    transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
769 770 771 772 773 774 775 776 777 778 779 780 781
    transRemoveExHandle(transGetRefMgt(), conn->refId);
    conn->refId = -1;
  }
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
  if (exh == NULL) {
    return -1;
  }
  exh->handle = conn;
  exh->pThrd = conn->hostThrd;
  conn->refId = exh->refId;

  transReleaseExHandle(transGetRefMgt(), handle);
  return 0;
dengyihao's avatar
dengyihao 已提交
782
}
dengyihao's avatar
dengyihao 已提交
783

dengyihao's avatar
dengyihao 已提交
784
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
785
  SCliConn*    conn = handle->data;
dengyihao's avatar
dengyihao 已提交
786
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
787
  tTrace("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
788
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
789
}
U
ubuntu 已提交
790
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
791
  // impl later
dengyihao's avatar
dengyihao 已提交
792 793 794
  if (handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
795 796
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
797
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
798
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
799
    while (transReadComplete(pBuf)) {
dengyihao's avatar
dengyihao 已提交
800
      tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
801 802 803 804 805 806
      if (pBuf->invalid) {
        cliHandleExcept(conn);
        break;
      } else {
        cliHandleResp(conn);
      }
dengyihao's avatar
dengyihao 已提交
807
    }
dengyihao's avatar
dengyihao 已提交
808 809
    return;
  }
dengyihao's avatar
dengyihao 已提交
810

811
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
812 813 814
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
dengyihao's avatar
dengyihao 已提交
815
    tTrace("%s conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
816 817
    return;
  }
dengyihao's avatar
fix bug  
dengyihao 已提交
818
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
819
    tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
dengyihao's avatar
dengyihao 已提交
820
    conn->broken = true;
U
ubuntu 已提交
821
    cliHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
822
  }
dengyihao's avatar
dengyihao 已提交
823
}
dengyihao's avatar
dengyihao 已提交
824

dengyihao's avatar
dengyihao 已提交
825
static SCliConn* cliCreateConn(SCliThrd* pThrd) {
wafwerar's avatar
wafwerar 已提交
826
  SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
827
  // read/write stream handle
G
gccgdb1234 已提交
828
  conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
829 830 831
  uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
  conn->stream->data = conn;

dengyihao's avatar
dengyihao 已提交
832 833 834 835 836 837 838 839
  uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
  if (timer == NULL) {
    timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    tDebug("no available timer, create a timer %p", timer);
    uv_timer_init(pThrd->loop, timer);
  }
  timer->data = conn;
  conn->timer = timer;
dengyihao's avatar
dengyihao 已提交
840

dengyihao's avatar
dengyihao 已提交
841
  conn->connReq.data = conn;
dengyihao's avatar
dengyihao 已提交
842 843
  transReqQueueInit(&conn->wreqQueue);

dengyihao's avatar
dengyihao 已提交
844
  transQueueInit(&conn->cliMsgs, NULL);
dengyihao's avatar
opt rpc  
dengyihao 已提交
845 846

  transInitBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
847
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
848
  conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
849
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
850
  conn->broken = false;
dengyihao's avatar
dengyihao 已提交
851
  transRefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
852

dengyihao's avatar
dengyihao 已提交
853
  atomic_add_fetch_32(&pThrd->connCount, 1);
dengyihao's avatar
dengyihao 已提交
854
  allocConnRef(conn, false);
dengyihao's avatar
dengyihao 已提交
855

dengyihao's avatar
dengyihao 已提交
856 857
  return conn;
}
U
ubuntu 已提交
858
static void cliDestroyConn(SCliConn* conn, bool clear) {
dengyihao's avatar
dengyihao 已提交
859
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
860
  tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
861

dengyihao's avatar
dengyihao 已提交
862 863
  QUEUE_REMOVE(&conn->q);
  QUEUE_INIT(&conn->q);
dengyihao's avatar
dengyihao 已提交
864 865 866 867

  if (conn->list != NULL) {
    SConnList* connList = conn->list;
    connList->list->numOfConn--;
dengyihao's avatar
dengyihao 已提交
868
    connList->size--;
dengyihao's avatar
dengyihao 已提交
869 870 871 872 873 874
  } else {
    SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
    connList->list->numOfConn--;
  }
  conn->list = NULL;

dengyihao's avatar
dengyihao 已提交
875
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
876
  transRemoveExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
opt rpc  
dengyihao 已提交
877
  conn->refId = -1;
dengyihao's avatar
dengyihao 已提交
878

dengyihao's avatar
dengyihao 已提交
879 880 881 882 883 884 885
  if (conn->task != NULL) {
    transDQCancel(pThrd->timeoutQueue, conn->task);
    conn->task = NULL;
  }
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
886
    taosArrayPush(pThrd->timerList, &conn->timer);
dengyihao's avatar
dengyihao 已提交
887 888
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
889

dengyihao's avatar
dengyihao 已提交
890
  if (clear) {
dengyihao's avatar
dengyihao 已提交
891
    if (!uv_is_closing((uv_handle_t*)conn->stream)) {
dengyihao's avatar
dengyihao 已提交
892
      uv_read_stop(conn->stream);
dengyihao's avatar
dengyihao 已提交
893 894
      uv_close((uv_handle_t*)conn->stream, cliDestroy);
    }
895
  }
dengyihao's avatar
dengyihao 已提交
896
}
U
ubuntu 已提交
897
static void cliDestroy(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
898 899 900
  if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
901
  SCliConn* conn = handle->data;
dengyihao's avatar
dengyihao 已提交
902
  SCliThrd* pThrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
903 904
  if (conn->timer != NULL) {
    uv_timer_stop(conn->timer);
dengyihao's avatar
dengyihao 已提交
905 906
    taosArrayPush(pThrd->timerList, &conn->timer);
    conn->timer->data = NULL;
dengyihao's avatar
dengyihao 已提交
907 908
    conn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
909

dengyihao's avatar
dengyihao 已提交
910 911
  atomic_sub_fetch_32(&pThrd->connCount, 1);

dengyihao's avatar
dengyihao 已提交
912
  transReleaseExHandle(transGetRefMgt(), conn->refId);
dengyihao's avatar
dengyihao 已提交
913
  transRemoveExHandle(transGetRefMgt(), conn->refId);
wafwerar's avatar
wafwerar 已提交
914 915
  taosMemoryFree(conn->ip);
  taosMemoryFree(conn->stream);
dengyihao's avatar
dengyihao 已提交
916 917 918

  cliDestroyConnMsgs(conn, true);

dengyihao's avatar
dengyihao 已提交
919
  tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
920
  transReqQueueClear(&conn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
921
  transDestroyBuffer(&conn->readBuf);
dengyihao's avatar
dengyihao 已提交
922

wafwerar's avatar
wafwerar 已提交
923
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
924
}
dengyihao's avatar
dengyihao 已提交
925
static bool cliHandleNoResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
926 927
  bool res = false;
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
928
    SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0);
dengyihao's avatar
dengyihao 已提交
929
    if (REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
930
      transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
931 932 933 934 935
      destroyCmsg(pMsg);
      res = true;
    }
    if (res == true) {
      if (cliMaySendCachedMsg(conn) == false) {
dengyihao's avatar
dengyihao 已提交
936
        SCliThrd* thrd = conn->hostThrd;
dengyihao's avatar
dengyihao 已提交
937
        addConnToPool(thrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
938 939 940
        res = false;
      } else {
        res = true;
dengyihao's avatar
dengyihao 已提交
941 942 943 944 945
      }
    }
  }
  return res;
}
U
ubuntu 已提交
946
static void cliSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
947 948
  SCliConn* pConn = transReqQueueRemove(req);
  if (pConn == NULL) return;
dengyihao's avatar
dengyihao 已提交
949

dengyihao's avatar
dengyihao 已提交
950 951 952
  SCliMsg* pMsg = !transQueueEmpty(&pConn->cliMsgs) ? transQueueGet(&pConn->cliMsgs, 0) : NULL;
  if (pMsg != NULL) {
    int64_t cost = taosGetTimestampUs() - pMsg->st;
dengyihao's avatar
dengyihao 已提交
953
    if (cost > 1000 * 20) {
dengyihao's avatar
dengyihao 已提交
954 955 956 957
      tWarn("%s conn %p send cost:%dus, send exception", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
    }
  }

dengyihao's avatar
dengyihao 已提交
958
  if (status == 0) {
dengyihao's avatar
dengyihao 已提交
959
    tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
960
  } else {
dengyihao's avatar
dengyihao 已提交
961 962 963 964
    if (!uv_is_closing((uv_handle_t*)&pConn->stream)) {
      tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
      cliHandleExcept(pConn);
    }
dengyihao's avatar
dengyihao 已提交
965 966
    return;
  }
dengyihao's avatar
dengyihao 已提交
967
  if (cliHandleNoResp(pConn) == true) {
dengyihao's avatar
dengyihao 已提交
968
    tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
969 970
    return;
  }
dengyihao's avatar
dengyihao 已提交
971
  uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
972
}
dengyihao's avatar
dengyihao 已提交
973 974 975 976
void cliSendBatch(SCliConn* pConn) {
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

dengyihao's avatar
dengyihao 已提交
977 978 979 980 981
  SCliBatch*     pBatch = pConn->pBatch;
  SCliBatchList* pList = pBatch->pList;
  pList->connCnt += 1;

  int32_t wLen = pBatch->wLen;
dengyihao's avatar
dengyihao 已提交
982 983 984 985

  uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
  int       i = 0;

dengyihao's avatar
dengyihao 已提交
986 987
  queue* h = NULL;
  QUEUE_FOREACH(h, &pBatch->wq) {
dengyihao's avatar
dengyihao 已提交
988 989 990 991 992 993 994 995 996
    SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q);

    STransConnCtx* pCtx = pCliMsg->ctx;

    STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
    if (pMsg->pCont == 0) {
      pMsg->pCont = (void*)rpcMallocCont(0);
      pMsg->contLen = 0;
    }
dengyihao's avatar
dengyihao 已提交
997

dengyihao's avatar
dengyihao 已提交
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
    int            msgLen = transMsgLenFromCont(pMsg->contLen);
    STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);

    if (pHead->comp == 0) {
      pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
      pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
      pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
      pHead->msgType = pMsg->msgType;
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
      memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
      pHead->traceId = pMsg->info.traceId;
      pHead->magicNum = htonl(TRANS_MAGIC_NUM);
    }
    pHead->timestamp = taosHton64(taosGetTimestampUs());

dengyihao's avatar
dengyihao 已提交
1014 1015 1016 1017 1018 1019 1020 1021
    if (pHead->comp == 0) {
      if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
        msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
        pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
      }
    } else {
      msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
    }
dengyihao's avatar
dengyihao 已提交
1022 1023 1024 1025 1026
    wb[i++] = uv_buf_init((char*)pHead, msgLen);
  }

  uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
  req->data = pConn;
dengyihao's avatar
dengyihao 已提交
1027
  tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn,
dengyihao's avatar
dengyihao 已提交
1028
         pBatch->wLen, pBatch->batchSize);
dengyihao's avatar
dengyihao 已提交
1029 1030 1031
  uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
  taosMemoryFree(wb);
}
U
ubuntu 已提交
1032
void cliSend(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
1033 1034 1035 1036 1037
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  if (transQueueEmpty(&pConn->cliMsgs)) {
    tError("%s conn %p not msg to send", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
1038
    cliHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
1039 1040
    return;
  }
dengyihao's avatar
dengyihao 已提交
1041 1042

  SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
1043
  CONN_GET_NEXT_SENDMSG(pConn);
dengyihao's avatar
dengyihao 已提交
1044 1045
  pCliMsg->sent = 1;

dengyihao's avatar
dengyihao 已提交
1046
  STransConnCtx* pCtx = pCliMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1047

U
ubuntu 已提交
1048
  STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
dengyihao's avatar
dengyihao 已提交
1049 1050 1051 1052
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
dengyihao's avatar
dengyihao 已提交
1053

dengyihao's avatar
dengyihao 已提交
1054
  int            msgLen = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
1055
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
dengyihao's avatar
dengyihao 已提交
1056

dengyihao's avatar
dengyihao 已提交
1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067
  if (pHead->comp == 0) {
    pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
    pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
    pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
    pHead->msgType = pMsg->msgType;
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
    memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
    pHead->traceId = pMsg->info.traceId;
    pHead->magicNum = htonl(TRANS_MAGIC_NUM);
  }
dengyihao's avatar
dengyihao 已提交
1068
  pHead->timestamp = taosHton64(taosGetTimestampUs());
dengyihao's avatar
dengyihao 已提交
1069

dengyihao's avatar
dengyihao 已提交
1070 1071 1072
  if (pHead->persist == 1) {
    CONN_SET_PERSIST_BY_APP(pConn);
  }
dengyihao's avatar
dengyihao 已提交
1073

dengyihao's avatar
dengyihao 已提交
1074
  STraceId* trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
1075

dengyihao's avatar
dengyihao 已提交
1076
  if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
dengyihao's avatar
dengyihao 已提交
1077
    uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
dengyihao's avatar
dengyihao 已提交
1078 1079
    if (timer == NULL) {
      timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
dengyihao's avatar
dengyihao 已提交
1080
      tDebug("no available timer, create a timer %p", timer);
dengyihao's avatar
dengyihao 已提交
1081 1082 1083 1084 1085
      uv_timer_init(pThrd->loop, timer);
    }
    timer->data = pConn;
    pConn->timer = timer;

dengyihao's avatar
dengyihao 已提交
1086 1087 1088
    tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
    uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
  }
dengyihao's avatar
dengyihao 已提交
1089

dengyihao's avatar
dengyihao 已提交
1090 1091 1092 1093 1094 1095 1096
  if (pHead->comp == 0) {
    if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
      msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
      pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
    }
  } else {
    msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
dengyihao's avatar
dengyihao 已提交
1097
  }
dengyihao's avatar
dengyihao 已提交
1098

dengyihao's avatar
dengyihao 已提交
1099 1100
  tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
          TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
dengyihao's avatar
dengyihao 已提交
1101

dengyihao's avatar
dengyihao 已提交
1102
  uv_buf_t    wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
1103
  uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
dengyihao's avatar
dengyihao 已提交
1104 1105 1106

  int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1107
    tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
dengyihao's avatar
dengyihao 已提交
1108 1109 1110
            uv_err_name(status));
    cliHandleExcept(pConn);
  }
U
ubuntu 已提交
1111 1112
  return;
_RETURN:
dengyihao's avatar
dengyihao 已提交
1113
  return;
dengyihao's avatar
dengyihao 已提交
1114
}
dengyihao's avatar
dengyihao 已提交
1115 1116

static void cliDestroyBatch(SCliBatch* pBatch) {
dengyihao's avatar
dengyihao 已提交
1117
  if (pBatch == NULL) return;
dengyihao's avatar
dengyihao 已提交
1118
  while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
1119 1120
    queue* h = QUEUE_HEAD(&pBatch->wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
1121

dengyihao's avatar
dengyihao 已提交
1122
    SCliMsg* p = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1123 1124
    destroyCmsg(p);
  }
dengyihao's avatar
dengyihao 已提交
1125 1126
  SCliBatchList* p = pBatch->pList;
  p->sending -= 1;
dengyihao's avatar
dengyihao 已提交
1127 1128
  taosMemoryFree(pBatch);
}
dengyihao's avatar
dengyihao 已提交
1129
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1130 1131 1132 1133 1134
  if (pThrd->quit == true) {
    cliDestroyBatch(pBatch);
    return;
  }

dengyihao's avatar
dengyihao 已提交
1135
  if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
dengyihao's avatar
dengyihao 已提交
1136 1137
    return;
  }
dengyihao's avatar
dengyihao 已提交
1138 1139
  STrans*        pTransInst = pThrd->pTransInst;
  SCliBatchList* pList = pBatch->pList;
dengyihao's avatar
dengyihao 已提交
1140

dengyihao's avatar
dengyihao 已提交
1141 1142 1143
  char key[TSDB_FQDN_LEN + 64] = {0};
  CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port);

dengyihao's avatar
dengyihao 已提交
1144 1145
  bool      exceed = false;
  SCliConn* conn = getConnFromPool(pThrd, key, &exceed);
dengyihao's avatar
dengyihao 已提交
1146

dengyihao's avatar
dengyihao 已提交
1147 1148 1149
  if (conn == NULL && exceed) {
    tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen,
           pBatch->batchSize, pTransInst->connLimitNum);
dengyihao's avatar
dengyihao 已提交
1150
    cliDestroyBatch(pBatch);
dengyihao's avatar
dengyihao 已提交
1151 1152
    return;
  }
dengyihao's avatar
dengyihao 已提交
1153 1154
  if (conn == NULL) {
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
1155
    conn->pBatch = pBatch;
1156
    conn->ip = taosStrdup(pList->dst);
dengyihao's avatar
dengyihao 已提交
1157

dengyihao's avatar
dengyihao 已提交
1158
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip);
dengyihao's avatar
dengyihao 已提交
1159 1160 1161 1162 1163 1164
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1165
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1166 1167 1168 1169 1170
      return;
    }
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1171
    addr.sin_port = (uint16_t)htons(pList->port);
dengyihao's avatar
dengyihao 已提交
1172

dengyihao's avatar
dengyihao 已提交
1173
    tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst);
dengyihao's avatar
dengyihao 已提交
1174 1175
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
    if (fd == -1) {
dengyihao's avatar
dengyihao 已提交
1176 1177 1178
      tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
             tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1179 1180 1181 1182
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1183 1184
      tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1185 1186 1187 1188
      return;
    }
    ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1189 1190
      tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1191 1192 1193 1194 1195 1196 1197 1198 1199
      return;
    }

    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
    if (ret != 0) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;
dengyihao's avatar
dengyihao 已提交
1200
      cliHandleFastFail(conn, -1);
dengyihao's avatar
dengyihao 已提交
1201 1202 1203 1204 1205 1206
      return;
    }
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
    return;
  }

dengyihao's avatar
dengyihao 已提交
1207
  conn->pBatch = pBatch;
dengyihao's avatar
dengyihao 已提交
1208
  cliSendBatch(conn);
dengyihao's avatar
dengyihao 已提交
1209 1210
}
static void cliSendBatchCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
1211 1212 1213
  SCliConn*  conn = req->data;
  SCliThrd*  thrd = conn->hostThrd;
  SCliBatch* p = conn->pBatch;
dengyihao's avatar
dengyihao 已提交
1214

dengyihao's avatar
dengyihao 已提交
1215
  SCliBatchList* pBatchList = p->pList;
dengyihao's avatar
dengyihao 已提交
1216
  SCliBatch*     nxtBatch = cliGetHeadFromList(pBatchList);
dengyihao's avatar
dengyihao 已提交
1217 1218
  pBatchList->connCnt -= 1;

dengyihao's avatar
dengyihao 已提交
1219 1220 1221
  conn->pBatch = NULL;

  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1222
    tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
dengyihao's avatar
dengyihao 已提交
1223
           p->wLen, p->batchSize, uv_err_name(status));
dengyihao's avatar
dengyihao 已提交
1224 1225 1226

    if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn);

dengyihao's avatar
dengyihao 已提交
1227
    cliHandleBatchReq(nxtBatch, thrd);
dengyihao's avatar
dengyihao 已提交
1228
  } else {
dengyihao's avatar
dengyihao 已提交
1229
    tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
dengyihao's avatar
dengyihao 已提交
1230
           p->batchSize);
dengyihao's avatar
dengyihao 已提交
1231
    if (!uv_is_closing((uv_handle_t*)&conn->stream)) {
dengyihao's avatar
dengyihao 已提交
1232 1233 1234 1235 1236 1237
      if (nxtBatch != NULL) {
        conn->pBatch = nxtBatch;
        cliSendBatch(conn);
      } else {
        addConnToPool(thrd->pool, conn);
      }
dengyihao's avatar
dengyihao 已提交
1238
    } else {
dengyihao's avatar
dengyihao 已提交
1239 1240
      cliDestroyBatch(nxtBatch);
      // conn release by other callback
dengyihao's avatar
dengyihao 已提交
1241
    }
dengyihao's avatar
dengyihao 已提交
1242
  }
dengyihao's avatar
dengyihao 已提交
1243 1244 1245

  cliDestroyBatch(p);
  taosMemoryFree(req);
dengyihao's avatar
dengyihao 已提交
1246
}
dengyihao's avatar
dengyihao 已提交
1247
static void cliHandleFastFail(SCliConn* pConn, int status) {
dengyihao's avatar
dengyihao 已提交
1248
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1249
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1250 1251 1252

  if (status == -1) status = ENETUNREACH;

dengyihao's avatar
dengyihao 已提交
1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271
  if (pConn->pBatch == NULL) {
    SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);

    STraceId* trace = &pMsg->msg.info.traceId;
    tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
            TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status));

    if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
        (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
      SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip));
      int64_t        cTimestamp = taosGetTimestampMs();
      if (item != NULL) {
        int32_t elapse = cTimestamp - item->timestamp;
        if (elapse >= 0 && elapse <= pTransInst->failFastInterval) {
          item->count++;
        } else {
          item->count = 1;
          item->timestamp = cTimestamp;
        }
dengyihao's avatar
dengyihao 已提交
1272
      } else {
dengyihao's avatar
dengyihao 已提交
1273 1274
        SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
        taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem));
dengyihao's avatar
dengyihao 已提交
1275 1276
      }
    }
dengyihao's avatar
dengyihao 已提交
1277
  } else {
dengyihao's avatar
dengyihao 已提交
1278 1279
    tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
           pConn, pConn->ip, uv_strerror(status));
dengyihao's avatar
dengyihao 已提交
1280 1281
    cliDestroyBatch(pConn->pBatch);
    pConn->pBatch = NULL;
dengyihao's avatar
dengyihao 已提交
1282 1283 1284
  }
  cliHandleExcept(pConn);
}
dengyihao's avatar
dengyihao 已提交
1285

dengyihao's avatar
dengyihao 已提交
1286 1287 1288
void cliConnCb(uv_connect_t* req, int status) {
  SCliConn* pConn = req->data;
  SCliThrd* pThrd = pConn->hostThrd;
dengyihao's avatar
dengyihao 已提交
1289 1290 1291 1292 1293 1294 1295 1296 1297 1298
  bool      timeout = false;

  if (pConn->timer == NULL) {
    timeout = true;
  } else {
    uv_timer_stop(pConn->timer);
    pConn->timer->data = NULL;
    taosArrayPush(pThrd->timerList, &pConn->timer);
    pConn->timer = NULL;
  }
dengyihao's avatar
dengyihao 已提交
1299 1300

  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
1301 1302 1303 1304 1305
    if (timeout == false) {
      cliHandleFastFail(pConn, status);
    } else if (timeout == true) {
      // already deal by timeout
    }
1306
    return;
dengyihao's avatar
dengyihao 已提交
1307
  }
dengyihao's avatar
dengyihao 已提交
1308

dengyihao's avatar
dengyihao 已提交
1309 1310
  struct sockaddr peername, sockname;
  int             addrlen = sizeof(peername);
dengyihao's avatar
dengyihao 已提交
1311
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
dengyihao's avatar
dengyihao 已提交
1312
  transSockInfo2Str(&peername, pConn->dst);
dengyihao's avatar
dengyihao 已提交
1313

dengyihao's avatar
dengyihao 已提交
1314 1315
  addrlen = sizeof(sockname);
  uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
dengyihao's avatar
dengyihao 已提交
1316
  transSockInfo2Str(&sockname, pConn->src);
dengyihao's avatar
dengyihao 已提交
1317

dengyihao's avatar
dengyihao 已提交
1318
  tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
1319 1320 1321 1322 1323
  if (pConn->pBatch != NULL) {
    cliSendBatch(pConn);
  } else {
    cliSend(pConn);
  }
dengyihao's avatar
dengyihao 已提交
1324 1325
}

dengyihao's avatar
dengyihao 已提交
1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
  STransConnCtx* pCtx = pMsg->ctx;
  STrans*        pTransInst = pThrd->pTransInst;

  STransMsg transMsg = {0};
  transMsg.contLen = 0;
  transMsg.pCont = NULL;
  transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS;
  transMsg.msgType = pMsg->msg.msgType + 1;
  transMsg.info.ahandle = pMsg->ctx->ahandle;
  transMsg.info.traceId = pMsg->msg.info.traceId;
  transMsg.info.hasEpSet = false;
  if (pCtx->pSem != NULL) {
    if (pCtx->pRsp == NULL) {
    } else {
      memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
    }
  } else {
    pTransInst->cfp(pTransInst->parent, &transMsg, NULL);
  }

  destroyCmsg(pMsg);
}
dengyihao's avatar
dengyihao 已提交
1349
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1350 1351 1352 1353 1354
  if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) {
    pThrd->stopMsg = pMsg;
    return;
  }
  pThrd->stopMsg = NULL;
dengyihao's avatar
dengyihao 已提交
1355
  pThrd->quit = true;
U
ubuntu 已提交
1356
  tDebug("cli work thread %p start to quit", pThrd);
dengyihao's avatar
dengyihao 已提交
1357
  destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1358

dengyihao's avatar
dengyihao 已提交
1359
  destroyConnPool(pThrd);
dengyihao's avatar
dengyihao 已提交
1360
  uv_walk(pThrd->loop, cliWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
1361
}
dengyihao's avatar
dengyihao 已提交
1362
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1363
  int64_t    refId = (int64_t)(pMsg->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1364
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1365
  if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1366
    tDebug("%" PRId64 " already released", refId);
dengyihao's avatar
dengyihao 已提交
1367 1368
    destroyCmsg(pMsg);
    return;
dengyihao's avatar
dengyihao 已提交
1369 1370 1371
  }

  SCliConn* conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1372
  transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1373
  tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
1374

dengyihao's avatar
dengyihao 已提交
1375 1376
  if (T_REF_VAL_GET(conn) == 2) {
    transUnrefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
1377 1378
    if (!transQueuePush(&conn->cliMsgs, pMsg)) {
      return;
dengyihao's avatar
dengyihao 已提交
1379 1380
    }
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1381 1382 1383
  } else {
    tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1384 1385
  }
}
dengyihao's avatar
dengyihao 已提交
1386
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1387
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
1388
  pThrd->cvtAddr = pCtx->cvtAddr;
dengyihao's avatar
dengyihao 已提交
1389 1390
  destroyCmsg(pMsg);
}
U
ubuntu 已提交
1391

dengyihao's avatar
dengyihao 已提交
1392 1393
SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
  STransConnCtx* pCtx = (*pMsg)->ctx;
dengyihao's avatar
dengyihao 已提交
1394 1395
  SCliConn*      conn = NULL;

dengyihao's avatar
dengyihao 已提交
1396
  int64_t refId = (int64_t)((*pMsg)->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
1397
  if (refId != 0) {
dengyihao's avatar
dengyihao 已提交
1398
    SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1399
    if (exh == NULL) {
dengyihao's avatar
dengyihao 已提交
1400
      tError("failed to get conn, refId: %" PRId64 "", refId);
dengyihao's avatar
dengyihao 已提交
1401 1402
      *ignore = true;
      return NULL;
dengyihao's avatar
dengyihao 已提交
1403 1404
    } else {
      conn = exh->handle;
dengyihao's avatar
dengyihao 已提交
1405
      if (conn == NULL) {
dengyihao's avatar
dengyihao 已提交
1406
        conn = getConnFromPool2(pThrd, addr, pMsg);
dengyihao's avatar
dengyihao 已提交
1407
        if (conn != NULL) specifyConnRef(conn, true, refId);
dengyihao's avatar
dengyihao 已提交
1408
      }
dengyihao's avatar
dengyihao 已提交
1409
      transReleaseExHandle(transGetRefMgt(), refId);
dengyihao's avatar
dengyihao 已提交
1410 1411 1412
    }
    return conn;
  };
dengyihao's avatar
dengyihao 已提交
1413

dengyihao's avatar
dengyihao 已提交
1414
  conn = getConnFromPool2(pThrd, addr, pMsg);
dengyihao's avatar
dengyihao 已提交
1415
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1416
    tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1417
  } else {
dengyihao's avatar
dengyihao 已提交
1418
    tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
dengyihao's avatar
dengyihao 已提交
1419
  }
dengyihao's avatar
dengyihao 已提交
1420 1421
  return conn;
}
dengyihao's avatar
dengyihao 已提交
1422
FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
dengyihao's avatar
dengyihao 已提交
1423 1424 1425
  if (pCvtAddr->cvt == false) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1426 1427 1428
  if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
    memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN);
    memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
dengyihao's avatar
dengyihao 已提交
1429 1430
  }
}
dengyihao's avatar
dengyihao 已提交
1431

dengyihao's avatar
dengyihao 已提交
1432
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
dengyihao's avatar
dengyihao 已提交
1433
  if (code != 0) return false;
dengyihao's avatar
dengyihao 已提交
1434
  // if (pCtx->retryCnt == 0) return false;
dengyihao's avatar
dengyihao 已提交
1435 1436 1437
  if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
  return true;
}
dengyihao's avatar
dengyihao 已提交
1438
FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
dengyihao's avatar
dengyihao 已提交
1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449
  if (pMsg == NULL) return -1;

  memset(pResp, 0, sizeof(STransMsg));

  pResp->code = TSDB_CODE_RPC_BROKEN_LINK;
  pResp->msgType = pMsg->msg.msgType + 1;
  pResp->info.ahandle = pMsg->ctx ? pMsg->ctx->ahandle : NULL;
  pResp->info.traceId = pMsg->msg.info.traceId;

  return 0;
}
dengyihao's avatar
dengyihao 已提交
1450 1451 1452 1453 1454
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
  uint32_t  addr = 0;
  uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
  if (v == NULL) {
    addr = taosGetIpv4FromFqdn(fqdn);
1455 1456 1457 1458
    if (addr == 0xffffffff) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr());
      return addr;
dengyihao's avatar
dengyihao 已提交
1459 1460
    }

dengyihao's avatar
dengyihao 已提交
1461 1462 1463 1464 1465 1466 1467 1468 1469 1470
    taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
  } else {
    addr = *v;
  }
  return addr;
}
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
  // impl later
  return;
}
dengyihao's avatar
dengyihao 已提交
1471

dengyihao's avatar
dengyihao 已提交
1472 1473 1474 1475 1476
static void doFreeTimeoutMsg(void* param) {
  STaskArg* arg = param;
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1477

dengyihao's avatar
dengyihao 已提交
1478 1479 1480 1481 1482 1483
  QUEUE_REMOVE(&pMsg->q);
  STraceId* trace = &pMsg->msg.info.traceId;
  tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
  doNotifyApp(pMsg, pThrd);
  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
1484
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1485
  STrans* pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
1486

dengyihao's avatar
dengyihao 已提交
1487 1488
  cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
  if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
dengyihao's avatar
dengyihao 已提交
1489
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1490 1491
    return;
  }
dengyihao's avatar
dengyihao 已提交
1492

dengyihao's avatar
dengyihao 已提交
1493 1494
  char*    fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
  uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
dengyihao's avatar
dengyihao 已提交
1495 1496
  char     addr[TSDB_FQDN_LEN + 64] = {0};
  CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port);
dengyihao's avatar
dengyihao 已提交
1497

dengyihao's avatar
dengyihao 已提交
1498
  bool      ignore = false;
dengyihao's avatar
dengyihao 已提交
1499
  SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr);
dengyihao's avatar
dengyihao 已提交
1500
  if (ignore == true) {
dengyihao's avatar
dengyihao 已提交
1501
    // persist conn already release by server
dengyihao's avatar
dengyihao 已提交
1502 1503
    STransMsg resp;
    cliBuildExceptResp(pMsg, &resp);
dengyihao's avatar
dengyihao 已提交
1504 1505 1506
    if (pMsg->type != Release) {
      pTransInst->cfp(pTransInst->parent, &resp, NULL);
    }
dengyihao's avatar
dengyihao 已提交
1507
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1508 1509
    return;
  }
dengyihao's avatar
dengyihao 已提交
1510
  if (conn == NULL && pMsg == NULL) {
dengyihao's avatar
dengyihao 已提交
1511 1512
    return;
  }
dengyihao's avatar
dengyihao 已提交
1513
  STraceId* trace = &pMsg->msg.info.traceId;
dengyihao's avatar
dengyihao 已提交
1514

dengyihao's avatar
dengyihao 已提交
1515
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
1516
    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
dengyihao's avatar
dengyihao 已提交
1517
    transQueuePush(&conn->cliMsgs, pMsg);
U
ubuntu 已提交
1518
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
1519
  } else {
U
ubuntu 已提交
1520
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
1521 1522 1523 1524

    int64_t refId = (int64_t)pMsg->msg.info.handle;
    if (refId != 0) specifyConnRef(conn, true, refId);

dengyihao's avatar
dengyihao 已提交
1525
    transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
dengyihao's avatar
dengyihao 已提交
1526
    transQueuePush(&conn->cliMsgs, pMsg);
dengyihao's avatar
dengyihao 已提交
1527

dengyihao's avatar
dengyihao 已提交
1528
    conn->ip = taosStrdup(addr);
dengyihao's avatar
dengyihao 已提交
1529

dengyihao's avatar
dengyihao 已提交
1530
    uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
1531 1532 1533 1534 1535 1536 1537 1538 1539
    if (ipaddr == 0xffffffff) {
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1540

dengyihao's avatar
dengyihao 已提交
1541
    struct sockaddr_in addr;
1542
    addr.sin_family = AF_INET;
1543
    addr.sin_addr.s_addr = ipaddr;
dengyihao's avatar
dengyihao 已提交
1544
    addr.sin_port = (uint16_t)htons(port);
dengyihao's avatar
dengyihao 已提交
1545

dengyihao's avatar
dengyihao 已提交
1546
    tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip);
1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566
    int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
    if (fd == -1) {
      tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
              tstrerror(TAOS_SYSTEM_ERROR(errno)));
      cliHandleExcept(conn);
      errno = 0;
      return;
    }
    int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
    if (ret != 0) {
      tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
    ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret != 0) {
      tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
1567

1568
    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
dengyihao's avatar
dengyihao 已提交
1569
    if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
1570 1571 1572 1573 1574
      uv_timer_stop(conn->timer);
      conn->timer->data = NULL;
      taosArrayPush(pThrd->timerList, &conn->timer);
      conn->timer = NULL;

dengyihao's avatar
dengyihao 已提交
1575
      cliHandleFastFail(conn, ret);
dengyihao's avatar
dengyihao 已提交
1576 1577
      return;
    }
dengyihao's avatar
dengyihao 已提交
1578
    uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
dengyihao's avatar
dengyihao 已提交
1579
  }
dengyihao's avatar
dengyihao 已提交
1580
  tGTrace("%s conn %p ready", pTransInst->label, conn);
dengyihao's avatar
dengyihao 已提交
1581
}
dengyihao's avatar
dengyihao 已提交
1582

dengyihao's avatar
dengyihao 已提交
1583
static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1584
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1585

dengyihao's avatar
dengyihao 已提交
1586 1587
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1588 1589 1590
    QUEUE_REMOVE(h);

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1591 1592 1593 1594 1595

    if (pMsg->type == Quit) {
      pThrd->stopMsg = pMsg;
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1596 1597 1598 1599 1600 1601 1602 1603
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);

    count++;
  }
  if (count >= 2) {
    tTrace("cli process batch size:%d", count);
  }
}
dengyihao's avatar
dengyihao 已提交
1604
SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
dengyihao's avatar
dengyihao 已提交
1605
  if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) {
dengyihao's avatar
dengyihao 已提交
1606 1607 1608 1609
    return NULL;
  }
  queue* hr = QUEUE_HEAD(&pList->wq);
  QUEUE_REMOVE(hr);
dengyihao's avatar
dengyihao 已提交
1610
  pList->sending += 1;
dengyihao's avatar
dengyihao 已提交
1611 1612 1613 1614 1615 1616

  pList->len -= 1;

  SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
  return batch;
}
dengyihao's avatar
dengyihao 已提交
1617

dengyihao's avatar
dengyihao 已提交
1618
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1619 1620
  STrans* pInst = pThrd->pTransInst;

dengyihao's avatar
dengyihao 已提交
1621
  int count = 0;
dengyihao's avatar
dengyihao 已提交
1622 1623
  while (!QUEUE_IS_EMPTY(wq)) {
    queue* h = QUEUE_HEAD(wq);
dengyihao's avatar
dengyihao 已提交
1624
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
1625 1626

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
1627 1628 1629 1630 1631 1632

    if (pMsg->type == Quit) {
      pThrd->stopMsg = pMsg;
      continue;
    }

dengyihao's avatar
dengyihao 已提交
1633
    if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
1634 1635 1636 1637 1638 1639 1640
      STransConnCtx* pCtx = pMsg->ctx;

      char*    ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
      uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
      char     key[TSDB_FQDN_LEN + 64] = {0};
      CONN_CONSTRUCT_HASH_KEY(key, ip, port);

dengyihao's avatar
dengyihao 已提交
1641 1642 1643 1644 1645
      // SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key));
      SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key));
      if (ppBatchList == NULL || *ppBatchList == NULL) {
        SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
        QUEUE_INIT(&pBatchList->wq);
dengyihao's avatar
dengyihao 已提交
1646
        pBatchList->connMax = pInst->connLimitNum;
dengyihao's avatar
dengyihao 已提交
1647
        pBatchList->connCnt = 0;
dengyihao's avatar
dengyihao 已提交
1648
        pBatchList->batchLenLimit = pInst->batchSize;
dengyihao's avatar
dengyihao 已提交
1649
        pBatchList->len += 1;
dengyihao's avatar
dengyihao 已提交
1650

1651 1652
        pBatchList->ip = taosStrdup(ip);
        pBatchList->dst = taosStrdup(key);
dengyihao's avatar
dengyihao 已提交
1653 1654
        pBatchList->port = port;

dengyihao's avatar
dengyihao 已提交
1655 1656
        SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
        QUEUE_INIT(&pBatch->wq);
dengyihao's avatar
dengyihao 已提交
1657 1658
        QUEUE_INIT(&pBatch->listq);

dengyihao's avatar
dengyihao 已提交
1659 1660 1661
        QUEUE_PUSH(&pBatch->wq, h);
        pBatch->wLen += 1;
        pBatch->batchSize += pMsg->msg.contLen;
dengyihao's avatar
dengyihao 已提交
1662
        pBatch->pList = pBatchList;
dengyihao's avatar
dengyihao 已提交
1663

dengyihao's avatar
dengyihao 已提交
1664
        QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
dengyihao's avatar
dengyihao 已提交
1665

dengyihao's avatar
dengyihao 已提交
1666
        taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
1667
      } else {
dengyihao's avatar
dengyihao 已提交
1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688
        if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
          SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
          QUEUE_INIT(&pBatch->wq);
          QUEUE_INIT(&pBatch->listq);

          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->wLen += 1;
          pBatch->batchSize = pMsg->msg.contLen;
          pBatch->pList = *ppBatchList;

          QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
          (*ppBatchList)->len += 1;

          continue;
        }

        queue*     hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
        SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
        if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) {
          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->batchSize += pMsg->msg.contLen;
dengyihao's avatar
dengyihao 已提交
1689
          pBatch->wLen += 1;
dengyihao's avatar
dengyihao 已提交
1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702
        } else {
          SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
          QUEUE_INIT(&pBatch->wq);
          QUEUE_INIT(&pBatch->listq);

          QUEUE_PUSH(&pBatch->wq, h);
          pBatch->wLen += 1;
          pBatch->batchSize += pMsg->msg.contLen;
          pBatch->pList = *ppBatchList;

          QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
          (*ppBatchList)->len += 1;
        }
dengyihao's avatar
dengyihao 已提交
1703
      }
dengyihao's avatar
dengyihao 已提交
1704
      continue;
dengyihao's avatar
dengyihao 已提交
1705
    }
dengyihao's avatar
dengyihao 已提交
1706
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1707
    count++;
dengyihao's avatar
dengyihao 已提交
1708
  }
dengyihao's avatar
dengyihao 已提交
1709

dengyihao's avatar
dengyihao 已提交
1710
  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
dengyihao's avatar
dengyihao 已提交
1711
  while (pIter != NULL) {
dengyihao's avatar
dengyihao 已提交
1712
    SCliBatchList* batchList = (SCliBatchList*)(*pIter);
dengyihao's avatar
dengyihao 已提交
1713 1714 1715
    SCliBatch*     batch = cliGetHeadFromList(batchList);
    if (batch != NULL) {
      cliHandleBatchReq(batch, pThrd);
dengyihao's avatar
dengyihao 已提交
1716
    }
dengyihao's avatar
dengyihao 已提交
1717
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
dengyihao's avatar
dengyihao 已提交
1718 1719
  }

dengyihao's avatar
dengyihao 已提交
1720
  if (count >= 2) {
S
Shengliang Guan 已提交
1721
    tTrace("cli process batch size:%d", count);
dengyihao's avatar
dengyihao 已提交
1722
  }
dengyihao's avatar
dengyihao 已提交
1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735
}

static void cliAsyncCb(uv_async_t* handle) {
  SAsyncItem* item = handle->data;
  SCliThrd*   pThrd = item->pThrd;
  STrans*     pTransInst = pThrd->pTransInst;

  // batch process to avoid to lock/unlock frequently
  queue wq;
  taosThreadMutexLock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  taosThreadMutexUnlock(&item->mtx);

dengyihao's avatar
dengyihao 已提交
1736
  int8_t supportBatch = pTransInst->supportBatch;
dengyihao's avatar
dengyihao 已提交
1737
  if (supportBatch == 0) {
dengyihao's avatar
dengyihao 已提交
1738
    cliNoBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1739
  } else if (supportBatch == 1) {
dengyihao's avatar
dengyihao 已提交
1740
    cliBatchDealReq(&wq, pThrd);
dengyihao's avatar
dengyihao 已提交
1741
  }
dengyihao's avatar
dengyihao 已提交
1742

dengyihao's avatar
dengyihao 已提交
1743
  if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
1744
}
dengyihao's avatar
dengyihao 已提交
1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770
static void cliPrepareCb(uv_prepare_t* handle) {
  SCliThrd* thrd = handle->data;
  tTrace("prepare work start");

  SAsyncPool* pool = thrd->asyncPool;
  for (int i = 0; i < pool->nAsync; i++) {
    uv_async_t* async = &(pool->asyncs[i]);
    SAsyncItem* item = async->data;

    queue wq;
    taosThreadMutexLock(&item->mtx);
    QUEUE_MOVE(&item->qmsg, &wq);
    taosThreadMutexUnlock(&item->mtx);

    int count = 0;
    while (!QUEUE_IS_EMPTY(&wq)) {
      queue* h = QUEUE_HEAD(&wq);
      QUEUE_REMOVE(h);

      SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
      (*cliAsyncHandle[pMsg->type])(pMsg, thrd);
      count++;
    }
  }
  tTrace("prepare work end");
  if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
dengyihao's avatar
dengyihao 已提交
1771
}
dengyihao's avatar
dengyihao 已提交
1772

dengyihao's avatar
dengyihao 已提交
1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801
void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
  transCtxCleanup(&conn->ctx);
  cliReleaseUnfinishedMsg(conn);
  if (destroy == 1) {
    transQueueDestroy(&conn->cliMsgs);
  } else {
    transQueueClear(&conn->cliMsgs);
  }
}

void cliIteraConnMsgs(SCliConn* conn) {
  SCliThrd* pThrd = conn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;

  for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
    SCliMsg* cmsg = transQueueGet(&conn->cliMsgs, i);
    if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) {
      continue;
    }

    STransMsg resp = {0};
    if (-1 == cliBuildExceptResp(cmsg, &resp)) {
      continue;
    }
    pTransInst->cfp(pTransInst->parent, &resp, NULL);

    cmsg->ctx->ahandle = NULL;
  }
}
dengyihao's avatar
dengyihao 已提交
1802 1803 1804
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
  if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
    uint64_t ahandle = pHead->ahandle;
dengyihao's avatar
dengyihao 已提交
1805
    tDebug("ahandle = %" PRIu64 "", ahandle);
dengyihao's avatar
dengyihao 已提交
1806 1807
    SCliMsg* pMsg = NULL;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
dengyihao's avatar
dengyihao 已提交
1808

dengyihao's avatar
dengyihao 已提交
1809 1810
    transClearBuffer(&conn->readBuf);
    transFreeMsg(transContFromHead((char*)pHead));
dengyihao's avatar
dengyihao 已提交
1811 1812 1813 1814

    for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->cliMsgs); i++) {
      SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i);
      if (cliMsg->type == Release) {
dengyihao's avatar
dengyihao 已提交
1815
        ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req");
dengyihao's avatar
dengyihao 已提交
1816 1817
        return true;
      }
dengyihao's avatar
dengyihao 已提交
1818
    }
dengyihao's avatar
dengyihao 已提交
1819 1820 1821

    cliIteraConnMsgs(conn);

dengyihao's avatar
dengyihao 已提交
1822 1823
    tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
    destroyCmsg(pMsg);
dengyihao's avatar
dengyihao 已提交
1824

dengyihao's avatar
dengyihao 已提交
1825 1826 1827 1828 1829 1830
    addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
    return true;
  }
  return false;
}

U
ubuntu 已提交
1831
static void* cliWorkThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
1832
  SCliThrd* pThrd = (SCliThrd*)arg;
dengyihao's avatar
dengyihao 已提交
1833
  pThrd->pid = taosGetSelfPthreadId();
G
gccgdb1234 已提交
1834
  setThreadName("trans-cli-work");
dengyihao's avatar
dengyihao 已提交
1835
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
dengyihao's avatar
dengyihao 已提交
1836 1837

  tDebug("thread quit-thread:%08" PRId64, pThrd->pid);
1838
  return NULL;
dengyihao's avatar
dengyihao 已提交
1839 1840
}

U
ubuntu 已提交
1841
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
1842
  SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj));
dengyihao's avatar
dengyihao 已提交
1843

dengyihao's avatar
dengyihao 已提交
1844
  STrans* pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
1845
  memcpy(cli->label, label, TSDB_LABEL_LEN);
dengyihao's avatar
dengyihao 已提交
1846
  cli->numOfThreads = numOfThreads;
dengyihao's avatar
dengyihao 已提交
1847
  cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*));
dengyihao's avatar
dengyihao 已提交
1848 1849

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
1850
    SCliThrd* pThrd = createThrdObj(shandle);
dengyihao's avatar
dengyihao 已提交
1851 1852 1853 1854 1855
    if (pThrd == NULL) {
      return NULL;
    }

    int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
1856
    if (err == 0) {
S
Shengliang Guan 已提交
1857
      tDebug("success to create tranport-cli thread:%d", i);
dengyihao's avatar
dengyihao 已提交
1858
    }
dengyihao's avatar
dengyihao 已提交
1859
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
1860
  }
dengyihao's avatar
dengyihao 已提交
1861

dengyihao's avatar
dengyihao 已提交
1862 1863
  return cli;
}
dengyihao's avatar
dengyihao 已提交
1864

dengyihao's avatar
dengyihao 已提交
1865
static FORCE_INLINE void destroyUserdata(STransMsg* userdata) {
dengyihao's avatar
dengyihao 已提交
1866 1867 1868 1869 1870 1871
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
dengyihao's avatar
dengyihao 已提交
1872

dengyihao's avatar
dengyihao 已提交
1873
static FORCE_INLINE void destroyCmsg(void* arg) {
dengyihao's avatar
dengyihao 已提交
1874
  SCliMsg* pMsg = arg;
dengyihao's avatar
dengyihao 已提交
1875 1876 1877
  if (pMsg == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1878

dengyihao's avatar
dengyihao 已提交
1879 1880
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
wafwerar's avatar
wafwerar 已提交
1881
  taosMemoryFree(pMsg);
dengyihao's avatar
dengyihao 已提交
1882
}
dengyihao's avatar
dengyihao 已提交
1883

dengyihao's avatar
dengyihao 已提交
1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
  if (param == NULL) return;

  STaskArg* arg = param;
  SCliMsg*  pMsg = arg->param1;
  SCliThrd* pThrd = arg->param2;

  tDebug("destroy Ahandle A");
  if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
    tDebug("destroy Ahandle B");
    pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
  }
  tDebug("destroy Ahandle C");

  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
  taosMemoryFree(pMsg);
}

dengyihao's avatar
dengyihao 已提交
1903 1904 1905
static SCliThrd* createThrdObj(void* trans) {
  STrans* pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
1906
  SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd));
U
ubuntu 已提交
1907

dengyihao's avatar
dengyihao 已提交
1908
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
1909
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
1910

wafwerar's avatar
wafwerar 已提交
1911
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
1912 1913 1914 1915 1916 1917 1918 1919
  int err = uv_loop_init(pThrd->loop);
  if (err != 0) {
    tError("failed to init uv_loop, reason:%s", uv_err_name(err));
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1920 1921 1922 1923 1924
  if (pTransInst->supportBatch) {
    pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 4, pThrd, cliAsyncCb);
  } else {
    pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb);
  }
dengyihao's avatar
dengyihao 已提交
1925
  if (pThrd->asyncPool == NULL) {
dengyihao's avatar
ref log  
dengyihao 已提交
1926
    tError("failed to init async pool");
dengyihao's avatar
dengyihao 已提交
1927 1928 1929 1930 1931 1932
    uv_loop_close(pThrd->loop);
    taosMemoryFree(pThrd->loop);
    taosThreadMutexDestroy(&pThrd->msgMtx);
    taosMemoryFree(pThrd);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
1933 1934 1935 1936

  pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
  uv_prepare_init(pThrd->loop, pThrd->prepare);
  pThrd->prepare->data = pThrd;
dengyihao's avatar
dengyihao 已提交
1937
  // uv_prepare_start(pThrd->prepare, cliPrepareCb);
dengyihao's avatar
dengyihao 已提交
1938

dengyihao's avatar
dengyihao 已提交
1939
  int32_t timerSize = 64;
dengyihao's avatar
dengyihao 已提交
1940 1941 1942 1943 1944 1945 1946
  pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
  for (int i = 0; i < timerSize; i++) {
    uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
    uv_timer_init(pThrd->loop, timer);
    taosArrayPush(pThrd->timerList, &timer);
  }

dengyihao's avatar
dengyihao 已提交
1947
  pThrd->pool = createConnPool(4);
dengyihao's avatar
dengyihao 已提交
1948
  transDQCreate(pThrd->loop, &pThrd->delayQueue);
dengyihao's avatar
dengyihao 已提交
1949

dengyihao's avatar
dengyihao 已提交
1950 1951
  transDQCreate(pThrd->loop, &pThrd->timeoutQueue);

dengyihao's avatar
dengyihao 已提交
1952 1953
  transDQCreate(pThrd->loop, &pThrd->waitConnQueue);

dengyihao's avatar
dengyihao 已提交
1954 1955 1956
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
  pThrd->pTransInst = trans;

dengyihao's avatar
dengyihao 已提交
1957
  pThrd->destroyAhandleFp = pTransInst->destroyFp;
dengyihao's avatar
dengyihao 已提交
1958
  pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1959 1960
  pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);

dengyihao's avatar
dengyihao 已提交
1961
  pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1962

dengyihao's avatar
dengyihao 已提交
1963
  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
1964 1965
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
1966
static void destroyThrdObj(SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
1967 1968 1969
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1970

wafwerar's avatar
wafwerar 已提交
1971
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
1972
  CLI_RELEASE_UV(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
1973
  taosThreadMutexDestroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
1974
  TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
dengyihao's avatar
dengyihao 已提交
1975
  transAsyncPoolDestroy(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
1976

dengyihao's avatar
dengyihao 已提交
1977
  transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
dengyihao's avatar
dengyihao 已提交
1978
  transDQDestroy(pThrd->timeoutQueue, NULL);
dengyihao's avatar
dengyihao 已提交
1979
  transDQDestroy(pThrd->waitConnQueue, NULL);
dengyihao's avatar
dengyihao 已提交
1980

dengyihao's avatar
dengyihao 已提交
1981
  tDebug("thread destroy %" PRId64, pThrd->pid);
dengyihao's avatar
dengyihao 已提交
1982 1983 1984 1985 1986
  for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
    uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
    taosMemoryFree(timer);
  }
  taosArrayDestroy(pThrd->timerList);
dengyihao's avatar
dengyihao 已提交
1987
  taosMemoryFree(pThrd->prepare);
wafwerar's avatar
wafwerar 已提交
1988
  taosMemoryFree(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
1989
  taosHashCleanup(pThrd->fqdn2ipCache);
dengyihao's avatar
dengyihao 已提交
1990
  taosHashCleanup(pThrd->failFastCache);
dengyihao's avatar
dengyihao 已提交
1991 1992 1993

  void** pIter = taosHashIterate(pThrd->batchCache, NULL);
  while (pIter != NULL) {
dengyihao's avatar
dengyihao 已提交
1994 1995 1996 1997 1998 1999 2000 2001
    SCliBatchList* pBatchList = (SCliBatchList*)(*pIter);
    while (!QUEUE_IS_EMPTY(&pBatchList->wq)) {
      queue* h = QUEUE_HEAD(&pBatchList->wq);
      QUEUE_REMOVE(h);

      SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
      cliDestroyBatch(pBatch);
    }
dengyihao's avatar
dengyihao 已提交
2002 2003
    taosMemoryFree(pBatchList->ip);
    taosMemoryFree(pBatchList->dst);
dengyihao's avatar
dengyihao 已提交
2004
    taosMemoryFree(pBatchList);
dengyihao's avatar
dengyihao 已提交
2005

dengyihao's avatar
dengyihao 已提交
2006 2007
    pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
  }
dengyihao's avatar
dengyihao 已提交
2008
  taosHashCleanup(pThrd->batchCache);
wafwerar's avatar
wafwerar 已提交
2009
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
2010
}
dengyihao's avatar
dengyihao 已提交
2011

dengyihao's avatar
dengyihao 已提交
2012
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
2013
  //
wafwerar's avatar
wafwerar 已提交
2014
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
2015
}
dengyihao's avatar
dengyihao 已提交
2016

dengyihao's avatar
dengyihao 已提交
2017
void cliSendQuit(SCliThrd* thrd) {
dengyihao's avatar
dengyihao 已提交
2018
  // cli can stop gracefully
wafwerar's avatar
wafwerar 已提交
2019
  SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2020
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
2021
  transAsyncSend(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
2022
  atomic_store_8(&thrd->asyncPool->stop, 1);
dengyihao's avatar
dengyihao 已提交
2023
}
dengyihao's avatar
dengyihao 已提交
2024 2025
void cliWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
dengyihao's avatar
dengyihao 已提交
2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037
    if (uv_handle_get_type(handle) == UV_TIMER) {
      // SCliConn* pConn = handle->data;
      //  if (pConn != NULL && pConn->timer != NULL) {
      //    SCliThrd* pThrd = pConn->hostThrd;
      //    uv_timer_stop((uv_timer_t*)handle);
      //    handle->data = NULL;
      //    taosArrayPush(pThrd->timerList, &pConn->timer);
      //    pConn->timer = NULL;
      //  }
    } else {
      uv_read_stop((uv_stream_t*)handle);
    }
dengyihao's avatar
dengyihao 已提交
2038
    uv_close(handle, cliDestroy);
dengyihao's avatar
dengyihao 已提交
2039 2040
  }
}
dengyihao's avatar
dengyihao 已提交
2041

dengyihao's avatar
dengyihao 已提交
2042
FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
dengyihao's avatar
dengyihao 已提交
2043
  int32_t index = pTransInst->index;
dengyihao's avatar
dengyihao 已提交
2044 2045 2046
  if (pTransInst->numOfThreads == 0) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2047 2048 2049 2050
  /*
   * no lock, and to avoid CPU load imbalance, set limit pTransInst->numOfThreads * 2000;
   */
  if (pTransInst->index++ >= pTransInst->numOfThreads * 2000) {
U
ubuntu 已提交
2051 2052 2053 2054
    pTransInst->index = 0;
  }
  return index % pTransInst->numOfThreads;
}
dengyihao's avatar
dengyihao 已提交
2055
static FORCE_INLINE void doDelayTask(void* param) {
dengyihao's avatar
dengyihao 已提交
2056
  STaskArg* arg = param;
dengyihao's avatar
dengyihao 已提交
2057
  cliHandleReq((SCliMsg*)arg->param1, (SCliThrd*)arg->param2);
dengyihao's avatar
dengyihao 已提交
2058 2059
  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
2060

dengyihao's avatar
dengyihao 已提交
2061 2062 2063
static void doCloseIdleConn(void* param) {
  STaskArg* arg = param;
  SCliConn* conn = arg->param1;
dengyihao's avatar
dengyihao 已提交
2064
  tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
2065
  conn->task = NULL;
dengyihao's avatar
dengyihao 已提交
2066 2067
  cliDestroyConn(conn, true);
  taosMemoryFree(arg);
dengyihao's avatar
dengyihao 已提交
2068 2069
}

dengyihao's avatar
dengyihao 已提交
2070
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
dengyihao's avatar
dengyihao 已提交
2071
  STrans*        pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2072 2073
  STransConnCtx* pCtx = pMsg->ctx;

dengyihao's avatar
dengyihao 已提交
2074 2075 2076 2077 2078 2079 2080
  if (rpcDebugFlag & DEBUG_DEBUG) {
    STraceId* trace = &pMsg->msg.info.traceId;
    char      tbuf[256] = {0};
    EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
    tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
            pCtx->retryStep, pCtx->retryNextInterval);
  }
dengyihao's avatar
dengyihao 已提交
2081 2082 2083 2084

  STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
  arg->param1 = pMsg;
  arg->param2 = pThrd;
dengyihao's avatar
dengyihao 已提交
2085 2086

  transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval);
dengyihao's avatar
dengyihao 已提交
2087
}
dengyihao's avatar
dengyihao 已提交
2088

dengyihao's avatar
dengyihao 已提交
2089
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
dengyihao's avatar
dengyihao 已提交
2090 2091 2092 2093
  if (*val != exp) {
    *val = newVal;
  }
}
dengyihao's avatar
dengyihao 已提交
2094

dengyihao's avatar
dengyihao 已提交
2095
FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
dengyihao's avatar
dengyihao 已提交
2096 2097 2098 2099 2100 2101
  if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
    return false;
  }
  // rebuild resp msg
  SEpSet epset;
  if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) {
dengyihao's avatar
dengyihao 已提交
2102 2103 2104 2105
    return false;
  }
  int32_t tlen = tSerializeSEpSet(NULL, 0, dst);

dengyihao's avatar
dengyihao 已提交
2106 2107 2108 2109 2110 2111 2112
  char*   buf = NULL;
  int32_t len = pResp->contLen - tlen;
  if (len != 0) {
    buf = rpcMallocCont(len);
    memcpy(buf, (char*)pResp->pCont + tlen, len);
  }
  rpcFreeCont(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2113 2114

  pResp->pCont = buf;
dengyihao's avatar
dengyihao 已提交
2115 2116
  pResp->contLen = len;

dengyihao's avatar
dengyihao 已提交
2117
  epsetAssign(dst, &epset);
dengyihao's avatar
dengyihao 已提交
2118 2119
  return true;
}
dengyihao's avatar
dengyihao 已提交
2120
bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
dengyihao's avatar
dengyihao 已提交
2121
  bool noDelay = true;
dengyihao's avatar
dengyihao 已提交
2122 2123 2124 2125 2126 2127 2128
  if (hasEpSet == false) {
    if (pResp->contLen == 0) {
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2129 2130 2131 2132 2133 2134 2135 2136 2137 2138
    } else if (pResp->contLen != 0) {
      SEpSet  epSet;
      int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
      if (valid < 0) {
        tDebug("get invalid epset, epset equal, continue");
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
2139
      } else {
dengyihao's avatar
dengyihao 已提交
2140 2141
        if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
          tDebug("epset not equal, retry new epset");
dengyihao's avatar
dengyihao 已提交
2142
          epsetAssign(&pCtx->epSet, &epSet);
dengyihao's avatar
dengyihao 已提交
2143 2144 2145 2146 2147 2148 2149 2150 2151
          noDelay = false;
        } else {
          if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
            noDelay = false;
          } else {
            tDebug("epset equal, continue");
            EPSET_FORWARD_INUSE(&pCtx->epSet);
          }
        }
dengyihao's avatar
dengyihao 已提交
2152
      }
dengyihao's avatar
dengyihao 已提交
2153 2154
    }
  } else {
dengyihao's avatar
dengyihao 已提交
2155
    SEpSet  epSet;
dengyihao's avatar
dengyihao 已提交
2156 2157
    int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
    if (valid < 0) {
dengyihao's avatar
dengyihao 已提交
2158 2159 2160 2161 2162 2163
      tDebug("get invalid epset, epset equal, continue");
      if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
        noDelay = false;
      } else {
        EPSET_FORWARD_INUSE(&pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2164
    } else {
dengyihao's avatar
dengyihao 已提交
2165 2166
      if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
        tDebug("epset not equal, retry new epset");
dengyihao's avatar
dengyihao 已提交
2167
        epsetAssign(&pCtx->epSet, &epSet);
dengyihao's avatar
dengyihao 已提交
2168
        noDelay = false;
dengyihao's avatar
dengyihao 已提交
2169
      } else {
dengyihao's avatar
dengyihao 已提交
2170 2171 2172 2173 2174 2175
        if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
          noDelay = false;
        } else {
          tDebug("epset equal, continue");
          EPSET_FORWARD_INUSE(&pCtx->epSet);
        }
dengyihao's avatar
dengyihao 已提交
2176
      }
dengyihao's avatar
dengyihao 已提交
2177 2178 2179 2180 2181
    }
  }
  return noDelay;
}
bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
2182 2183
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2184

dengyihao's avatar
dengyihao 已提交
2185 2186
  STransConnCtx* pCtx = pMsg->ctx;
  int32_t        code = pResp->code;
dengyihao's avatar
dengyihao 已提交
2187

dengyihao's avatar
dengyihao 已提交
2188
  bool retry = pTransInst->retry != NULL ? pTransInst->retry(code, pResp->msgType - 1) : false;
dengyihao's avatar
dengyihao 已提交
2189 2190 2191
  if (retry == false) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
2192 2193 2194 2195 2196 2197 2198 2199

  if (!pCtx->retryInit) {
    pCtx->retryMinInterval = pTransInst->retryMinInterval;
    pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
    pCtx->retryStepFactor = pTransInst->retryStepFactor;
    pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet;
    pCtx->retryInitTimestamp = taosGetTimestampMs();
    pCtx->retryNextInterval = pCtx->retryMinInterval;
dengyihao's avatar
dengyihao 已提交
2200
    pCtx->retryStep = 0;
dengyihao's avatar
dengyihao 已提交
2201
    pCtx->retryInit = true;
dengyihao's avatar
dengyihao 已提交
2202
    pCtx->retryCode = TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
2203 2204 2205

    // already retry, not use handle specified by app;
    pMsg->msg.info.handle = 0;
dengyihao's avatar
dengyihao 已提交
2206
  }
dengyihao's avatar
dengyihao 已提交
2207

dengyihao's avatar
dengyihao 已提交
2208 2209
  if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
    return false;
dengyihao's avatar
dengyihao 已提交
2210
  }
dengyihao's avatar
dengyihao 已提交
2211

dengyihao's avatar
dengyihao 已提交
2212 2213 2214 2215 2216 2217
  // code, msgType

  // A:  epset,   leader, not self
  // B:  epset,   not know leader
  // C:  no epset, leader but not serivce

dengyihao's avatar
dengyihao 已提交
2218 2219
  bool noDelay = false;
  if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
dengyihao's avatar
dengyihao 已提交
2220
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2221
    noDelay = cliResetEpset(pCtx, pResp, false);
dengyihao's avatar
dengyihao 已提交
2222 2223
    transFreeMsg(pResp->pCont);
    transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
2224
  } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
2225 2226
             code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED ||
             code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
dengyihao's avatar
dengyihao 已提交
2227
             code == TSDB_CODE_APP_IS_STOPPING || code == TSDB_CODE_VND_STOPPED) {
dengyihao's avatar
dengyihao 已提交
2228
    tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2229
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
2230 2231
    transFreeMsg(pResp->pCont);
    addConnToPool(pThrd->pool, pConn);
dengyihao's avatar
dengyihao 已提交
2232
  } else if (code == TSDB_CODE_SYN_RESTORING) {
dengyihao's avatar
dengyihao 已提交
2233
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2234
    noDelay = cliResetEpset(pCtx, pResp, true);
dengyihao's avatar
dengyihao 已提交
2235 2236
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2237
  } else {
dengyihao's avatar
dengyihao 已提交
2238
    tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
dengyihao's avatar
dengyihao 已提交
2239 2240 2241
    noDelay = cliResetEpset(pCtx, pResp, false);
    addConnToPool(pThrd->pool, pConn);
    transFreeMsg(pResp->pCont);
dengyihao's avatar
dengyihao 已提交
2242
  }
dengyihao's avatar
dengyihao 已提交
2243 2244 2245 2246
  if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) {
    // save one internal code
    pCtx->retryCode = code;
  }
dengyihao's avatar
dengyihao 已提交
2247

dengyihao's avatar
dengyihao 已提交
2248 2249 2250
  if (noDelay == false) {
    pCtx->epsetRetryCnt = 1;
    pCtx->retryStep++;
dengyihao's avatar
dengyihao 已提交
2251

dengyihao's avatar
dengyihao 已提交
2252 2253 2254 2255 2256 2257 2258 2259
    int64_t factor = pow(pCtx->retryStepFactor, pCtx->retryStep - 1);
    pCtx->retryNextInterval = factor * pCtx->retryMinInterval;
    if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
      pCtx->retryNextInterval = pCtx->retryMaxInterval;
    }
  } else {
    pCtx->retryNextInterval = 0;
    pCtx->epsetRetryCnt++;
dengyihao's avatar
dengyihao 已提交
2260
  }
dengyihao's avatar
dengyihao 已提交
2261

dengyihao's avatar
dengyihao 已提交
2262
  pMsg->sent = 0;
dengyihao's avatar
dengyihao 已提交
2263
  cliSchedMsgToNextNode(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
2264
  return true;
dengyihao's avatar
dengyihao 已提交
2265
}
dengyihao's avatar
dengyihao 已提交
2266
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
2267 2268
  SCliThrd* pThrd = pConn->hostThrd;
  STrans*   pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
2269

dengyihao's avatar
dengyihao 已提交
2270
  if (pMsg == NULL || pMsg->ctx == NULL) {
dengyihao's avatar
dengyihao 已提交
2271
    tTrace("%s conn %p handle resp", pTransInst->label, pConn);
dengyihao's avatar
dengyihao 已提交
2272 2273 2274
    pTransInst->cfp(pTransInst->parent, pResp, NULL);
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
2275

dengyihao's avatar
dengyihao 已提交
2276
  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
2277

dengyihao's avatar
dengyihao 已提交
2278 2279 2280
  bool retry = cliGenRetryRule(pConn, pResp, pMsg);
  if (retry == true) {
    return -1;
dengyihao's avatar
dengyihao 已提交
2281
  }
dengyihao's avatar
dengyihao 已提交
2282

dengyihao's avatar
dengyihao 已提交
2283 2284 2285
  if (pCtx->retryCode != TSDB_CODE_SUCCESS) {
    int32_t code = pResp->code;
    // return internal code app
dengyihao's avatar
dengyihao 已提交
2286 2287
    if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK ||
        code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
dengyihao's avatar
dengyihao 已提交
2288 2289 2290 2291
      pResp->code = pCtx->retryCode;
    }
  }

2292
  // check whole vnodes is offline on this vgroup
2293 2294
  if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps || pCtx->retryStep > 0) {
    if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
A
Alex Duan 已提交
2295
      pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
2296
    } else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) {
A
Alex Duan 已提交
2297
      pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK;
2298 2299 2300
    }
  }

dengyihao's avatar
dengyihao 已提交
2301 2302
  STraceId* trace = &pResp->info.traceId;
  bool      hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2303
  if (hasEpSet) {
dengyihao's avatar
dengyihao 已提交
2304 2305 2306 2307 2308
    if (rpcDebugFlag & DEBUG_TRACE) {
      char tbuf[256] = {0};
      EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
      tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
    }
dengyihao's avatar
dengyihao 已提交
2309
  }
dengyihao's avatar
dengyihao 已提交
2310

dengyihao's avatar
dengyihao 已提交
2311
  if (pCtx->pSem != NULL) {
dengyihao's avatar
dengyihao 已提交
2312
    tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
2313
    if (pCtx->pRsp == NULL) {
dengyihao's avatar
dengyihao 已提交
2314
      tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
2315 2316 2317
    } else {
      memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
    }
dengyihao's avatar
dengyihao 已提交
2318
    tsem_post(pCtx->pSem);
2319
    pCtx->pRsp = NULL;
dengyihao's avatar
dengyihao 已提交
2320
  } else {
dengyihao's avatar
dengyihao 已提交
2321
    tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
2322
    if (retry == false && hasEpSet == true) {
dengyihao's avatar
dengyihao 已提交
2323
      pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
2324
    } else {
dengyihao's avatar
dengyihao 已提交
2325
      if (!cliIsEpsetUpdated(pResp->code, pCtx)) {
dengyihao's avatar
dengyihao 已提交
2326 2327 2328 2329
        pTransInst->cfp(pTransInst->parent, pResp, NULL);
      } else {
        pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
      }
dengyihao's avatar
dengyihao 已提交
2330
    }
dengyihao's avatar
dengyihao 已提交
2331
  }
dengyihao's avatar
dengyihao 已提交
2332
  return 0;
dengyihao's avatar
dengyihao 已提交
2333
}
U
ubuntu 已提交
2334 2335

void transCloseClient(void* arg) {
U
ubuntu 已提交
2336
  SCliObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
2337
  for (int i = 0; i < cli->numOfThreads; i++) {
U
ubuntu 已提交
2338
    cliSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2339
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
2340
  }
wafwerar's avatar
wafwerar 已提交
2341 2342
  taosMemoryFree(cli->pThreadObj);
  taosMemoryFree(cli);
dengyihao's avatar
dengyihao 已提交
2343
}
dengyihao's avatar
dengyihao 已提交
2344 2345 2346 2347 2348
void transRefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2349
  tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2350 2351 2352 2353 2354 2355 2356
  UNUSED(ref);
}
void transUnrefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
2357
  tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
2358
  if (ref == 0) {
U
ubuntu 已提交
2359
    cliDestroyConn((SCliConn*)handle, true);
dengyihao's avatar
dengyihao 已提交
2360 2361
  }
}
dengyihao's avatar
dengyihao 已提交
2362
static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2363
  SCliThrd*  pThrd = NULL;
dengyihao's avatar
dengyihao 已提交
2364
  SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2365 2366 2367
  if (exh == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
2368

dengyihao's avatar
dengyihao 已提交
2369 2370 2371 2372 2373 2374
  if (exh->pThrd == NULL && trans != NULL) {
    int idx = cliRBChoseIdx(trans);
    if (idx < 0) return NULL;
    exh->pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }

dengyihao's avatar
dengyihao 已提交
2375
  pThrd = exh->pThrd;
dengyihao's avatar
dengyihao 已提交
2376
  transReleaseExHandle(transGetRefMgt(), handle);
dengyihao's avatar
dengyihao 已提交
2377 2378
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
2379
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
dengyihao's avatar
dengyihao 已提交
2380
  if (handle == 0) {
dengyihao's avatar
dengyihao 已提交
2381
    int idx = cliRBChoseIdx(trans);
dengyihao's avatar
dengyihao 已提交
2382
    if (idx < 0) return NULL;
dengyihao's avatar
dengyihao 已提交
2383 2384
    return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
  }
dengyihao's avatar
dengyihao 已提交
2385
  SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle);
D
dapan1121 已提交
2386
  return pThrd;
dengyihao's avatar
dengyihao 已提交
2387
}
dengyihao's avatar
dengyihao 已提交
2388
int transReleaseCliHandle(void* handle) {
dengyihao's avatar
dengyihao 已提交
2389 2390 2391
  int  idx = -1;
  bool valid = false;

dengyihao's avatar
dengyihao 已提交
2392
  SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
dengyihao's avatar
dengyihao 已提交
2393
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2394
    return -1;
dengyihao's avatar
dengyihao 已提交
2395
  }
dengyihao's avatar
dengyihao 已提交
2396

dengyihao's avatar
dengyihao 已提交
2397
  STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527};
dengyihao's avatar
rm code  
dengyihao 已提交
2398
  TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
dengyihao's avatar
dengyihao 已提交
2399

dengyihao's avatar
dengyihao 已提交
2400 2401 2402
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
  pCtx->ahandle = tmsg.info.ahandle;

dengyihao's avatar
dengyihao 已提交
2403
  SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2404
  cmsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
2405
  cmsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2406
  cmsg->type = Release;
dengyihao's avatar
dengyihao 已提交
2407
  cmsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2408

dengyihao's avatar
dengyihao 已提交
2409 2410 2411
  STraceId* trace = &tmsg.info.traceId;
  tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);

dengyihao's avatar
dengyihao 已提交
2412
  if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
dengyihao's avatar
dengyihao 已提交
2413
    destroyCmsg(cmsg);
dengyihao's avatar
dengyihao 已提交
2414 2415
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2416
  return 0;
dengyihao's avatar
dengyihao 已提交
2417
}
dengyihao's avatar
dengyihao 已提交
2418

dengyihao's avatar
dengyihao 已提交
2419
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
2420 2421 2422 2423 2424
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2425

dengyihao's avatar
dengyihao 已提交
2426
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
dengyihao's avatar
dengyihao 已提交
2427
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2428
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2429
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2430
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2431 2432
  }

dengyihao's avatar
dengyihao 已提交
2433
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
wafwerar's avatar
wafwerar 已提交
2434
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2435 2436 2437
  epsetAssign(&pCtx->epSet, pEpSet);
  epsetAssign(&pCtx->origEpSet, pEpSet);

S
Shengliang Guan 已提交
2438
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2439
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2440

H
Haojun Liao 已提交
2441
  if (ctx != NULL) pCtx->appCtx = *ctx;
dengyihao's avatar
dengyihao 已提交
2442

wafwerar's avatar
wafwerar 已提交
2443
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2444
  cliMsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
2445
  cliMsg->msg = *pReq;
dengyihao's avatar
dengyihao 已提交
2446
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2447
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2448
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2449

dengyihao's avatar
dengyihao 已提交
2450
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2451 2452
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2453 2454
  if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2455
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2456 2457
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2458
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2459
  return 0;
dengyihao's avatar
dengyihao 已提交
2460
}
dengyihao's avatar
dengyihao 已提交
2461

dengyihao's avatar
dengyihao 已提交
2462
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
dengyihao's avatar
dengyihao 已提交
2463 2464 2465 2466 2467
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
  if (pTransInst == NULL) {
    transFreeMsg(pReq->pCont);
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2468

dengyihao's avatar
dengyihao 已提交
2469 2470
  SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
  if (pThrd == NULL) {
dengyihao's avatar
dengyihao 已提交
2471
    transFreeMsg(pReq->pCont);
dengyihao's avatar
dengyihao 已提交
2472
    transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2473
    return TSDB_CODE_RPC_BROKEN_LINK;
dengyihao's avatar
dengyihao 已提交
2474
  }
dengyihao's avatar
dengyihao 已提交
2475

dengyihao's avatar
dengyihao 已提交
2476 2477
  tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
  tsem_init(sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
2478

dengyihao's avatar
dengyihao 已提交
2479 2480
  TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());

wafwerar's avatar
wafwerar 已提交
2481
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2482 2483
  epsetAssign(&pCtx->epSet, pEpSet);
  epsetAssign(&pCtx->origEpSet, pEpSet);
S
Shengliang Guan 已提交
2484
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
2485
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
2486
  pCtx->pSem = sem;
dengyihao's avatar
dengyihao 已提交
2487 2488
  pCtx->pRsp = pRsp;

wafwerar's avatar
wafwerar 已提交
2489
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
2490 2491 2492
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
2493
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
2494
  cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2495

dengyihao's avatar
dengyihao 已提交
2496
  STraceId* trace = &pReq->info.traceId;
dengyihao's avatar
dengyihao 已提交
2497 2498
  tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
          EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
2499

dengyihao's avatar
dengyihao 已提交
2500 2501
  int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
  if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
2502
    destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2503
    goto _RETURN;
dengyihao's avatar
dengyihao 已提交
2504
  }
dengyihao's avatar
dengyihao 已提交
2505
  tsem_wait(sem);
dengyihao's avatar
dengyihao 已提交
2506 2507

_RETURN:
dengyihao's avatar
dengyihao 已提交
2508 2509
  tsem_destroy(sem);
  taosMemoryFree(sem);
dengyihao's avatar
dengyihao 已提交
2510
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2511
  return ret;
dengyihao's avatar
dengyihao 已提交
2512
}
dengyihao's avatar
dengyihao 已提交
2513 2514 2515
/*
 *
 **/
dengyihao's avatar
dengyihao 已提交
2516
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
dengyihao's avatar
dengyihao 已提交
2517
  STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2518 2519 2520
  if (pTransInst == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
2521 2522 2523

  SCvtAddr cvtAddr = {0};
  if (ip != NULL && fqdn != NULL) {
dengyihao's avatar
dengyihao 已提交
2524 2525
    tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
    tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn));
dengyihao's avatar
dengyihao 已提交
2526 2527
    cvtAddr.cvt = true;
  }
dengyihao's avatar
dengyihao 已提交
2528 2529
  for (int i = 0; i < pTransInst->numOfThreads; i++) {
    STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
2530
    pCtx->cvtAddr = cvtAddr;
dengyihao's avatar
dengyihao 已提交
2531 2532 2533 2534

    SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
    cliMsg->ctx = pCtx;
    cliMsg->type = Update;
dengyihao's avatar
dengyihao 已提交
2535
    cliMsg->refId = (int64_t)shandle;
dengyihao's avatar
dengyihao 已提交
2536

dengyihao's avatar
dengyihao 已提交
2537
    SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
S
Shengliang Guan 已提交
2538
    tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid);
dengyihao's avatar
dengyihao 已提交
2539

dengyihao's avatar
dengyihao 已提交
2540 2541
    if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) {
      destroyCmsg(cliMsg);
dengyihao's avatar
dengyihao 已提交
2542
      transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2543 2544
      return -1;
    }
dengyihao's avatar
dengyihao 已提交
2545
  }
dengyihao's avatar
dengyihao 已提交
2546
  transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
dengyihao's avatar
dengyihao 已提交
2547
  return 0;
dengyihao's avatar
dengyihao 已提交
2548
}
dengyihao's avatar
dengyihao 已提交
2549 2550 2551 2552 2553

int64_t transAllocHandle() {
  SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
  exh->refId = transAddExHandle(transGetRefMgt(), exh);
  tDebug("pre alloc refId %" PRId64 "", exh->refId);
dengyihao's avatar
dengyihao 已提交
2554

dengyihao's avatar
dengyihao 已提交
2555 2556
  return exh->refId;
}