transCli.c 37.0 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1
/* * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
dengyihao's avatar
dengyihao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifdef USE_UV
#include "transComm.h"

typedef struct SCliConn {
dengyihao's avatar
dengyihao 已提交
19
  T_REF_DECLARE()
dengyihao's avatar
dengyihao 已提交
20 21
  uv_connect_t connReq;
  uv_stream_t* stream;
dengyihao's avatar
dengyihao 已提交
22
  uv_write_t   writeReq;
dengyihao's avatar
dengyihao 已提交
23

dengyihao's avatar
dengyihao 已提交
24 25 26 27 28 29 30 31 32
  void* hostThrd;
  int   hThrdIdx;

  SConnBuffer readBuf;
  STransQueue cliMsgs;
  queue       conn;
  uint64_t    expireTime;

  STransCtx  ctx;
dengyihao's avatar
dengyihao 已提交
33 34
  bool       broken;  // link broken or not
  ConnStatus status;  //
dengyihao's avatar
dengyihao 已提交
35 36 37 38

  char*    ip;
  uint32_t port;

dengyihao's avatar
dengyihao 已提交
39
  // debug and log info
dengyihao's avatar
dengyihao 已提交
40
  struct sockaddr_in addr;
dengyihao's avatar
dengyihao 已提交
41
  struct sockaddr_in locaddr;
dengyihao's avatar
dengyihao 已提交
42
} SCliConn;
dengyihao's avatar
dengyihao 已提交
43

dengyihao's avatar
dengyihao 已提交
44
typedef struct SCliMsg {
dengyihao's avatar
dengyihao 已提交
45
  STransConnCtx* ctx;
dengyihao's avatar
formate  
dengyihao 已提交
46
  STransMsg      msg;
dengyihao's avatar
dengyihao 已提交
47 48
  queue          q;
  uint64_t       st;
dengyihao's avatar
dengyihao 已提交
49
  STransMsgType  type;
dengyihao's avatar
dengyihao 已提交
50
  int            sent;  //(0: no send, 1: alread sent)
dengyihao's avatar
dengyihao 已提交
51 52 53
} SCliMsg;

typedef struct SCliThrdObj {
dengyihao's avatar
dengyihao 已提交
54
  TdThread    thread;
dengyihao's avatar
dengyihao 已提交
55
  uv_loop_t*  loop;
dengyihao's avatar
dengyihao 已提交
56
  SAsyncPool* asyncPool;
dengyihao's avatar
dengyihao 已提交
57
  uv_timer_t  timer;
dengyihao's avatar
dengyihao 已提交
58 59 60
  void*       pool;  // conn pool

  // msg queue
dengyihao's avatar
dengyihao 已提交
61
  queue         msg;
wafwerar's avatar
wafwerar 已提交
62
  TdThreadMutex msgMtx;
dengyihao's avatar
dengyihao 已提交
63 64 65
  SDelayQueue*  delayQueue;
  uint64_t      nextTimeout;  // next timeout
  void*         pTransInst;   //
dengyihao's avatar
dengyihao 已提交
66 67 68 69 70

  bool   useDefaultEpSet;
  SEpSet defaultEpSet;

  bool quit;
dengyihao's avatar
dengyihao 已提交
71 72
} SCliThrdObj;

U
ubuntu 已提交
73
typedef struct SCliObj {
dengyihao's avatar
dengyihao 已提交
74 75 76 77
  char          label[TSDB_LABEL_LEN];
  int32_t       index;
  int           numOfThreads;
  SCliThrdObj** pThreadObj;
U
ubuntu 已提交
78
} SCliObj;
dengyihao's avatar
dengyihao 已提交
79

dengyihao's avatar
dengyihao 已提交
80 81 82 83
typedef struct SConnList {
  queue conn;
} SConnList;

dengyihao's avatar
dengyihao 已提交
84
// conn pool
dengyihao's avatar
dengyihao 已提交
85
// add expire timeout and capacity limit
dengyihao's avatar
dengyihao 已提交
86
static void*     createConnPool(int size);
87
static void*     destroyConnPool(void* pool);
dengyihao's avatar
dengyihao 已提交
88
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
dengyihao's avatar
dengyihao 已提交
89
static void      addConnToPool(void* pool, SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
90

dengyihao's avatar
dengyihao 已提交
91
// register timer in each thread to clear expire conn
U
ubuntu 已提交
92 93
static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for recv
dengyihao's avatar
dengyihao 已提交
94
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
U
ubuntu 已提交
95 96
// callback after  read nbytes from socket
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
dengyihao's avatar
dengyihao 已提交
97
// callback after write data to socket
U
ubuntu 已提交
98
static void cliSendCb(uv_write_t* req, int status);
dengyihao's avatar
dengyihao 已提交
99
// callback after conn  to server
U
ubuntu 已提交
100 101
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
dengyihao's avatar
dengyihao 已提交
102

dengyihao's avatar
dengyihao 已提交
103
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
dengyihao's avatar
dengyihao 已提交
104

U
ubuntu 已提交
105 106 107
static SCliConn* cliCreateConn(SCliThrdObj* thrd);
static void      cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void      cliDestroy(uv_handle_t* handle);
dengyihao's avatar
dengyihao 已提交
108
static void      cliSend(SCliConn* pConn);
dengyihao's avatar
dengyihao 已提交
109

dengyihao's avatar
dengyihao 已提交
110 111 112 113
/*
 * set TCP connection timeout per-socket level
 */
static int cliCreateSocket();
dengyihao's avatar
dengyihao 已提交
114
// process data read from server, add decompress etc later
U
ubuntu 已提交
115
static void cliHandleResp(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
116
// handle except about conn
U
ubuntu 已提交
117
static void cliHandleExcept(SCliConn* conn);
dengyihao's avatar
dengyihao 已提交
118

119
// handle req from app
U
ubuntu 已提交
120 121
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
dengyihao's avatar
dengyihao 已提交
122
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd);
dengyihao's avatar
dengyihao 已提交
123 124 125
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
                                                                      cliHandleUpdate};
dengyihao's avatar
dengyihao 已提交
126

U
ubuntu 已提交
127
static void cliSendQuit(SCliThrdObj* thrd);
U
ubuntu 已提交
128
static void destroyUserdata(STransMsg* userdata);
dengyihao's avatar
dengyihao 已提交
129

U
ubuntu 已提交
130
static int cliRBChoseIdx(STrans* pTransInst);
dengyihao's avatar
dengyihao 已提交
131

dengyihao's avatar
dengyihao 已提交
132 133
static void destroyCmsg(SCliMsg* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx);
dengyihao's avatar
dengyihao 已提交
134 135 136
// thread obj
static SCliThrdObj* createThrdObj();
static void         destroyThrdObj(SCliThrdObj* pThrd);
dengyihao's avatar
dengyihao 已提交
137

dengyihao's avatar
dengyihao 已提交
138 139 140 141 142 143 144 145 146
static void cliWalkCb(uv_handle_t* handle, void* arg);

#define CLI_RELEASE_UV(loop)        \
  do {                              \
    uv_walk(loop, cliWalkCb, NULL); \
    uv_run(loop, UV_RUN_DEFAULT);   \
    uv_loop_close(loop);            \
  } while (0);

dengyihao's avatar
dengyihao 已提交
147 148 149 150 151 152
// snprintf may cause performance problem
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port)          \
  do {                                                  \
    snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
  } while (0)

dengyihao's avatar
dengyihao 已提交
153
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
dengyihao's avatar
dengyihao 已提交
154 155 156
#define CONN_PERSIST_TIME(para)      (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn)   (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn)    (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
dengyihao's avatar
dengyihao 已提交
157 158 159
#define CONN_SHOULD_RELEASE(conn, head)                                                  \
  do {                                                                                   \
    if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) {                       \
dengyihao's avatar
dengyihao 已提交
160 161
      uint64_t ahandle = head->ahandle;                                                  \
      CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);                                         \
dengyihao's avatar
dengyihao 已提交
162 163 164 165
      conn->status = ConnRelease;                                                        \
      transClearBuffer(&conn->readBuf);                                                  \
      transFreeMsg(transContFromHead((char*)head));                                      \
      tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \
dengyihao's avatar
dengyihao 已提交
166
      if (T_REF_VAL_GET(conn) > 1) {                                                     \
167
        transUnrefCliHandle(conn);                                                       \
dengyihao's avatar
dengyihao 已提交
168
      }                                                                                  \
dengyihao's avatar
dengyihao 已提交
169
      destroyCmsg(pMsg);                                                                 \
dengyihao's avatar
dengyihao 已提交
170
      addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn);                         \
dengyihao's avatar
dengyihao 已提交
171 172
      return;                                                                            \
    }                                                                                    \
dengyihao's avatar
dengyihao 已提交
173 174
  } while (0)

dengyihao's avatar
dengyihao 已提交
175 176 177 178 179
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle)                                         \
  do {                                                                                    \
    int i = 0, sz = transQueueSize(&conn->cliMsgs);                                       \
    for (; i < sz; i++) {                                                                 \
      pMsg = transQueueGet(&conn->cliMsgs, i);                                            \
S
Shengliang 已提交
180
      if (pMsg != NULL && pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
dengyihao's avatar
dengyihao 已提交
181 182 183 184 185 186 187 188
        break;                                                                            \
      }                                                                                   \
    }                                                                                     \
    if (i == sz) {                                                                        \
      pMsg = NULL;                                                                        \
    } else {                                                                              \
      pMsg = transQueueRm(&conn->cliMsgs, i);                                             \
    }                                                                                     \
dengyihao's avatar
dengyihao 已提交
189
  } while (0)
U
ubuntu 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202
#define CONN_GET_NEXT_SENDMSG(conn)                 \
  do {                                              \
    int i = 0;                                      \
    do {                                            \
      pCliMsg = transQueueGet(&conn->cliMsgs, i++); \
      if (pCliMsg && 0 == pCliMsg->sent) {          \
        break;                                      \
      }                                             \
    } while (pCliMsg != NULL);                      \
    if (pCliMsg == NULL) {                          \
      goto _RETURN;                                 \
    }                                               \
  } while (0)
dengyihao's avatar
dengyihao 已提交
203

dengyihao's avatar
dengyihao 已提交
204 205 206 207 208
#define CONN_HANDLE_THREAD_QUIT(thrd) \
  do {                                \
    if (thrd->quit) {                 \
      return;                         \
    }                                 \
dengyihao's avatar
dengyihao 已提交
209 210 211 212 213
  } while (0)

#define CONN_HANDLE_BROKEN(conn) \
  do {                           \
    if (conn->broken) {          \
dengyihao's avatar
formate  
dengyihao 已提交
214
      cliHandleExcept(conn);     \
dengyihao's avatar
dengyihao 已提交
215
      return;                    \
dengyihao's avatar
dengyihao 已提交
216
    }                            \
dengyihao's avatar
dengyihao 已提交
217
  } while (0)
dengyihao's avatar
dengyihao 已提交
218

dengyihao's avatar
formate  
dengyihao 已提交
219 220
#define CONN_SET_PERSIST_BY_APP(conn) \
  do {                                \
dengyihao's avatar
dengyihao 已提交
221 222
    if (conn->status == ConnNormal) { \
      conn->status = ConnAcquire;     \
dengyihao's avatar
formate  
dengyihao 已提交
223 224 225
      transRefCliHandle(conn);        \
    }                                 \
  } while (0)
226

dengyihao's avatar
dengyihao 已提交
227 228 229 230
#define CONN_NO_PERSIST_BY_APP(conn) \
  (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) \
  (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
231

S
Shengliang Guan 已提交
232 233
#define REQUEST_NO_RESP(msg)         ((msg)->info.noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg)   ((msg)->info.persistHandle == 1)
dengyihao's avatar
dengyihao 已提交
234
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
dengyihao's avatar
dengyihao 已提交
235

dengyihao's avatar
dengyihao 已提交
236
#define EPSET_GET_INUSE_IP(epSet)   ((epSet)->eps[(epSet)->inUse].fqdn)
dengyihao's avatar
dengyihao 已提交
237 238
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)

U
ubuntu 已提交
239
static void* cliWorkThread(void* arg);
dengyihao's avatar
dengyihao 已提交
240

dengyihao's avatar
dengyihao 已提交
241
bool cliMaySendCachedMsg(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
242
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
243
    SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
244
    CONN_GET_NEXT_SENDMSG(conn);
dengyihao's avatar
dengyihao 已提交
245 246
    cliSend(conn);
  }
dengyihao's avatar
dengyihao 已提交
247
  return false;
U
ubuntu 已提交
248 249
_RETURN:
  return false;
dengyihao's avatar
dengyihao 已提交
250
}
dengyihao's avatar
formate  
dengyihao 已提交
251
void cliHandleResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
252
  SCliThrdObj* pThrd = conn->hostThrd;
dengyihao's avatar
formate  
dengyihao 已提交
253
  STrans*      pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
254

dengyihao's avatar
dengyihao 已提交
255 256 257
  STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
  pHead->code = htonl(pHead->code);
  pHead->msgLen = htonl(pHead->msgLen);
dengyihao's avatar
dengyihao 已提交
258

dengyihao's avatar
dengyihao 已提交
259 260 261 262 263
  STransMsg transMsg = {0};
  transMsg.contLen = transContLenFromMsg(pHead->msgLen);
  transMsg.pCont = transContFromHead((char*)pHead);
  transMsg.code = pHead->code;
  transMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
264
  transMsg.info.ahandle = NULL;
dengyihao's avatar
dengyihao 已提交
265

dengyihao's avatar
dengyihao 已提交
266 267
  SCliMsg*       pMsg = NULL;
  STransConnCtx* pCtx = NULL;
dengyihao's avatar
dengyihao 已提交
268
  CONN_SHOULD_RELEASE(conn, pHead);
dengyihao's avatar
dengyihao 已提交
269

dengyihao's avatar
dengyihao 已提交
270 271
  if (CONN_NO_PERSIST_BY_APP(conn)) {
    pMsg = transQueuePop(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
272 273
    pCtx = pMsg ? pMsg->ctx : NULL;
    if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
S
Shengliang Guan 已提交
274 275 276
      transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
      if (transMsg.info.ahandle == NULL) {
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
dengyihao's avatar
dengyihao 已提交
277
      }
S
Shengliang Guan 已提交
278
      tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
279
    } else {
S
Shengliang Guan 已提交
280 281
      transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
      tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
282
    }
U
ubuntu 已提交
283
  } else {
dengyihao's avatar
dengyihao 已提交
284 285 286
    uint64_t ahandle = (uint64_t)pHead->ahandle;
    CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
    if (pMsg == NULL) {
S
Shengliang Guan 已提交
287 288 289 290
      transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
      tDebug("cli conn %p construct ahandle %p by %s, persist: 1", conn, transMsg.info.ahandle,
             TMSG_INFO(transMsg.msgType));
      if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
291
        transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
S
Shengliang Guan 已提交
292 293
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
        tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
294 295 296
      }
    } else {
      pCtx = pMsg ? pMsg->ctx : NULL;
S
Shengliang Guan 已提交
297 298
      transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
      tDebug("cli conn %p get ahandle %p, persist: 1", conn, transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
299
    }
U
ubuntu 已提交
300
  }
dengyihao's avatar
dengyihao 已提交
301 302 303
  // buf's mem alread translated to transMsg.pCont
  transClearBuffer(&conn->readBuf);

dengyihao's avatar
dengyihao 已提交
304
  if (!CONN_NO_PERSIST_BY_APP(conn)) {
S
Shengliang Guan 已提交
305
    transMsg.info.handle = conn;
U
ubuntu 已提交
306
    tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
307 308
  }

dengyihao's avatar
dengyihao 已提交
309 310 311
  tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
         TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
         taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
dengyihao's avatar
dengyihao 已提交
312

dengyihao's avatar
dengyihao 已提交
313 314 315 316 317
  if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
    tTrace("except, server continue send while cli ignore it");
    // transUnrefCliHandle(conn);
    return;
  }
S
Shengliang Guan 已提交
318
  if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
dengyihao's avatar
dengyihao 已提交
319 320 321 322
    tTrace("except, server continue send while cli ignore it");
    // transUnrefCliHandle(conn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
323

dengyihao's avatar
dengyihao 已提交
324 325 326 327
  int ret = cliAppCb(conn, &transMsg, pMsg);
  if (ret != 0) {
    tTrace("try to send req to next node");
    return;
dengyihao's avatar
dengyihao 已提交
328
  }
dengyihao's avatar
dengyihao 已提交
329 330
  destroyCmsg(pMsg);

dengyihao's avatar
dengyihao 已提交
331
  if (cliMaySendCachedMsg(conn) == true) {
dengyihao's avatar
dengyihao 已提交
332 333
    return;
  }
dengyihao's avatar
dengyihao 已提交
334

U
ubuntu 已提交
335
  if (CONN_NO_PERSIST_BY_APP(conn)) {
dengyihao's avatar
dengyihao 已提交
336
    addConnToPool(pThrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
337
  }
dengyihao's avatar
test  
dengyihao 已提交
338

dengyihao's avatar
dengyihao 已提交
339
  uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
340
  // start thread's timer of conn pool if not active
dengyihao's avatar
dengyihao 已提交
341
  if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
U
ubuntu 已提交
342
    // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
343
  }
dengyihao's avatar
dengyihao 已提交
344
}
U
ubuntu 已提交
345 346

void cliHandleExcept(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
347
  if (transQueueEmpty(&pConn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
348
    if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
dengyihao's avatar
dengyihao 已提交
349
      tTrace("%s cli conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
formate  
dengyihao 已提交
350 351 352
      transUnrefCliHandle(pConn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
353
  }
dengyihao's avatar
dengyihao 已提交
354
  SCliThrdObj* pThrd = pConn->hostThrd;
dengyihao's avatar
formate  
dengyihao 已提交
355
  STrans*      pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
356
  bool         once = false;
D
dapan1121 已提交
357
  do {
dengyihao's avatar
dengyihao 已提交
358
    SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
D
dapan1121 已提交
359 360
    if (pMsg == NULL && once) {
      break;
dengyihao's avatar
dengyihao 已提交
361
    }
dengyihao's avatar
dengyihao 已提交
362
    STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
U
ubuntu 已提交
363

dengyihao's avatar
dengyihao 已提交
364 365 366
    STransMsg transMsg = {0};
    transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
    transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
S
Shengliang Guan 已提交
367 368
    transMsg.info.ahandle = NULL;
    transMsg.info.handle = pConn;
dengyihao's avatar
dengyihao 已提交
369

dengyihao's avatar
dengyihao 已提交
370
    if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
S
Shengliang Guan 已提交
371 372
      transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
      tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
dengyihao's avatar
dengyihao 已提交
373
             TMSG_INFO(transMsg.msgType));
S
Shengliang Guan 已提交
374 375
      if (transMsg.info.ahandle == NULL) {
        transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
dengyihao's avatar
dengyihao 已提交
376
        tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
S
Shengliang Guan 已提交
377
               transMsg.info.ahandle);
dengyihao's avatar
dengyihao 已提交
378
      }
dengyihao's avatar
dengyihao 已提交
379
    } else {
S
Shengliang Guan 已提交
380
      transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
dengyihao's avatar
dengyihao 已提交
381 382 383
    }

    if (pCtx == NULL || pCtx->pSem == NULL) {
S
Shengliang Guan 已提交
384
      if (transMsg.info.ahandle == NULL) {
D
dapan1121 已提交
385
        once = true;
U
ubuntu 已提交
386 387
        continue;
      }
dengyihao's avatar
dengyihao 已提交
388 389 390 391 392
    }
    int ret = cliAppCb(pConn, &transMsg, pMsg);
    if (ret != 0) {
      tTrace("try to send req to next node");
      return;
dengyihao's avatar
dengyihao 已提交
393 394 395
    }
    destroyCmsg(pMsg);
    tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
D
dapan1121 已提交
396
  } while (!transQueueEmpty(&pConn->cliMsgs));
dengyihao's avatar
test  
dengyihao 已提交
397

dengyihao's avatar
dengyihao 已提交
398
  transUnrefCliHandle(pConn);
dengyihao's avatar
dengyihao 已提交
399
}
dengyihao's avatar
dengyihao 已提交
400

U
ubuntu 已提交
401
void cliTimeoutCb(uv_timer_t* handle) {
dengyihao's avatar
dengyihao 已提交
402
  SCliThrdObj* pThrd = handle->data;
dengyihao's avatar
dengyihao 已提交
403
  STrans*      pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
404
  int64_t      currentTime = pThrd->nextTimeout;
dengyihao's avatar
dengyihao 已提交
405
  tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pTransInst->label);
dengyihao's avatar
dengyihao 已提交
406

dengyihao's avatar
dengyihao 已提交
407
  SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
dengyihao's avatar
dengyihao 已提交
408 409 410 411 412 413
  while (p != NULL) {
    while (!QUEUE_IS_EMPTY(&p->conn)) {
      queue*    h = QUEUE_HEAD(&p->conn);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
      if (c->expireTime < currentTime) {
        QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
414
        transUnrefCliHandle(c);
dengyihao's avatar
dengyihao 已提交
415 416 417 418
      } else {
        break;
      }
    }
dengyihao's avatar
dengyihao 已提交
419
    p = taosHashIterate((SHashObj*)pThrd->pool, p);
dengyihao's avatar
dengyihao 已提交
420 421
  }

dengyihao's avatar
dengyihao 已提交
422 423
  pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
  uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0);
dengyihao's avatar
dengyihao 已提交
424
}
U
ubuntu 已提交
425 426

void* createConnPool(int size) {
427 428
  // thread local, no lock
  return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
429
}
U
ubuntu 已提交
430
void* destroyConnPool(void* pool) {
dengyihao's avatar
dengyihao 已提交
431
  SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
dengyihao's avatar
dengyihao 已提交
432 433 434 435 436
  while (connList != NULL) {
    while (!QUEUE_IS_EMPTY(&connList->conn)) {
      queue* h = QUEUE_HEAD(&connList->conn);
      QUEUE_REMOVE(h);
      SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
U
ubuntu 已提交
437
      cliDestroyConn(c, true);
dengyihao's avatar
dengyihao 已提交
438
    }
dengyihao's avatar
dengyihao 已提交
439
    connList = taosHashIterate((SHashObj*)pool, connList);
dengyihao's avatar
dengyihao 已提交
440
  }
dengyihao's avatar
dengyihao 已提交
441
  taosHashCleanup(pool);
442
  return NULL;
dengyihao's avatar
dengyihao 已提交
443 444
}

dengyihao's avatar
dengyihao 已提交
445
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
dengyihao's avatar
dengyihao 已提交
446
  char key[128] = {0};
dengyihao's avatar
dengyihao 已提交
447
  CONN_CONSTRUCT_HASH_KEY(key, ip, port);
dengyihao's avatar
dengyihao 已提交
448

dengyihao's avatar
dengyihao 已提交
449 450
  SHashObj*  pPool = pool;
  SConnList* plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
451 452
  if (plist == NULL) {
    SConnList list;
dengyihao's avatar
dengyihao 已提交
453 454
    taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
    plist = taosHashGet(pPool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
455
    QUEUE_INIT(&plist->conn);
dengyihao's avatar
dengyihao 已提交
456 457 458 459 460 461 462
  }

  if (QUEUE_IS_EMPTY(&plist->conn)) {
    return NULL;
  }
  queue* h = QUEUE_HEAD(&plist->conn);
  QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
463
  SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
U
ubuntu 已提交
464
  conn->status = ConnNormal;
dengyihao's avatar
dengyihao 已提交
465 466
  QUEUE_INIT(&conn->conn);
  return conn;
dengyihao's avatar
dengyihao 已提交
467
}
dengyihao's avatar
dengyihao 已提交
468
static void addConnToPool(void* pool, SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
469 470 471
  SCliThrdObj* thrd = conn->hostThrd;
  CONN_HANDLE_THREAD_QUIT(thrd);

dengyihao's avatar
dengyihao 已提交
472 473 474 475
  STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
  conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
  transCtxCleanup(&conn->ctx);
  transQueueClear(&conn->cliMsgs);
dengyihao's avatar
dengyihao 已提交
476
  conn->status = ConnInPool;
dengyihao's avatar
dengyihao 已提交
477

dengyihao's avatar
dengyihao 已提交
478
  char key[128] = {0};
dengyihao's avatar
dengyihao 已提交
479
  CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
U
ubuntu 已提交
480
  tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
dengyihao's avatar
dengyihao 已提交
481

dengyihao's avatar
dengyihao 已提交
482
  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
dengyihao's avatar
dengyihao 已提交
483 484 485
  // list already create before
  assert(plist != NULL);
  QUEUE_PUSH(&plist->conn, &conn->conn);
dengyihao's avatar
dengyihao 已提交
486
  assert(!QUEUE_IS_EMPTY(&plist->conn));
dengyihao's avatar
dengyihao 已提交
487
}
dengyihao's avatar
dengyihao 已提交
488
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
489
  SCliConn*    conn = handle->data;
dengyihao's avatar
dengyihao 已提交
490
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
491
  transAllocBuffer(pBuf, buf);
dengyihao's avatar
dengyihao 已提交
492
}
U
ubuntu 已提交
493
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
494
  // impl later
dengyihao's avatar
dengyihao 已提交
495 496 497
  if (handle->data == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
498 499
  SCliConn*    conn = handle->data;
  SConnBuffer* pBuf = &conn->readBuf;
dengyihao's avatar
dengyihao 已提交
500
  if (nread > 0) {
dengyihao's avatar
dengyihao 已提交
501
    pBuf->len += nread;
dengyihao's avatar
dengyihao 已提交
502
    if (transReadComplete(pBuf)) {
U
ubuntu 已提交
503 504
      tTrace("%s cli conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
      cliHandleResp(conn);
dengyihao's avatar
dengyihao 已提交
505
    } else {
U
ubuntu 已提交
506
      tTrace("%s cli conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
507
    }
dengyihao's avatar
dengyihao 已提交
508 509
    return;
  }
dengyihao's avatar
dengyihao 已提交
510

511 512
  assert(nread <= 0);
  if (nread == 0) {
dengyihao's avatar
dengyihao 已提交
513 514 515
    // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
    // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
    // read(2).
dengyihao's avatar
dengyihao 已提交
516
    tTrace("%s cli conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
517 518
    return;
  }
dengyihao's avatar
fix bug  
dengyihao 已提交
519
  if (nread < 0) {
U
ubuntu 已提交
520
    tError("%s cli conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread));
dengyihao's avatar
dengyihao 已提交
521
    conn->broken = true;
U
ubuntu 已提交
522
    cliHandleExcept(conn);
dengyihao's avatar
dengyihao 已提交
523
  }
dengyihao's avatar
dengyihao 已提交
524
}
dengyihao's avatar
dengyihao 已提交
525

U
ubuntu 已提交
526
static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
wafwerar's avatar
wafwerar 已提交
527
  SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
dengyihao's avatar
dengyihao 已提交
528
  // read/write stream handle
wafwerar's avatar
wafwerar 已提交
529
  conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
dengyihao's avatar
dengyihao 已提交
530 531 532 533 534
  uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
  conn->stream->data = conn;

  conn->writeReq.data = conn;
  conn->connReq.data = conn;
dengyihao's avatar
dengyihao 已提交
535 536

  transQueueInit(&conn->cliMsgs, NULL);
dengyihao's avatar
dengyihao 已提交
537 538
  QUEUE_INIT(&conn->conn);
  conn->hostThrd = pThrd;
dengyihao's avatar
dengyihao 已提交
539 540
  conn->status = ConnNormal;
  conn->broken = 0;
dengyihao's avatar
dengyihao 已提交
541 542 543
  transRefCliHandle(conn);
  return conn;
}
U
ubuntu 已提交
544 545
static void cliDestroyConn(SCliConn* conn, bool clear) {
  tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
546

dengyihao's avatar
dengyihao 已提交
547 548
  QUEUE_REMOVE(&conn->conn);
  if (clear) {
U
ubuntu 已提交
549
    uv_close((uv_handle_t*)conn->stream, cliDestroy);
550
  }
dengyihao's avatar
dengyihao 已提交
551
}
U
ubuntu 已提交
552
static void cliDestroy(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
553
  SCliConn* conn = handle->data;
wafwerar's avatar
wafwerar 已提交
554 555
  taosMemoryFree(conn->ip);
  taosMemoryFree(conn->stream);
dengyihao's avatar
dengyihao 已提交
556 557
  transCtxCleanup(&conn->ctx);
  transQueueDestroy(&conn->cliMsgs);
U
ubuntu 已提交
558
  tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
559
  transDestroyBuffer(&conn->readBuf);
wafwerar's avatar
wafwerar 已提交
560
  taosMemoryFree(conn);
dengyihao's avatar
dengyihao 已提交
561
}
dengyihao's avatar
dengyihao 已提交
562
static bool cliHandleNoResp(SCliConn* conn) {
dengyihao's avatar
dengyihao 已提交
563 564
  bool res = false;
  if (!transQueueEmpty(&conn->cliMsgs)) {
dengyihao's avatar
dengyihao 已提交
565
    SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0);
dengyihao's avatar
dengyihao 已提交
566
    if (REQUEST_NO_RESP(&pMsg->msg)) {
dengyihao's avatar
dengyihao 已提交
567 568
      transQueuePop(&conn->cliMsgs);
      // taosArrayRemove(msgs, 0);
dengyihao's avatar
dengyihao 已提交
569 570 571 572 573
      destroyCmsg(pMsg);
      res = true;
    }
    if (res == true) {
      if (cliMaySendCachedMsg(conn) == false) {
dengyihao's avatar
dengyihao 已提交
574 575
        SCliThrdObj* thrd = conn->hostThrd;
        addConnToPool(thrd->pool, conn);
dengyihao's avatar
dengyihao 已提交
576 577 578 579 580
      }
    }
  }
  return res;
}
U
ubuntu 已提交
581
static void cliSendCb(uv_write_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
582
  SCliConn* pConn = req->data;
dengyihao's avatar
dengyihao 已提交
583

dengyihao's avatar
dengyihao 已提交
584
  if (status == 0) {
U
ubuntu 已提交
585
    tTrace("%s cli conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
586
  } else {
U
ubuntu 已提交
587 588
    tError("%s cli conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
    cliHandleExcept(pConn);
dengyihao's avatar
dengyihao 已提交
589 590
    return;
  }
dengyihao's avatar
dengyihao 已提交
591 592 593 594
  if (cliHandleNoResp(pConn) == true) {
    tTrace("%s cli conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
    return;
  }
dengyihao's avatar
dengyihao 已提交
595
  uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
dengyihao's avatar
dengyihao 已提交
596
}
dengyihao's avatar
dengyihao 已提交
597

U
ubuntu 已提交
598
void cliSend(SCliConn* pConn) {
dengyihao's avatar
dengyihao 已提交
599 600
  CONN_HANDLE_BROKEN(pConn);

dengyihao's avatar
dengyihao 已提交
601 602
  // assert(taosArrayGetSize(pConn->cliMsgs) > 0);
  assert(!transQueueEmpty(&pConn->cliMsgs));
dengyihao's avatar
dengyihao 已提交
603 604

  SCliMsg* pCliMsg = NULL;
U
ubuntu 已提交
605
  CONN_GET_NEXT_SENDMSG(pConn);
dengyihao's avatar
dengyihao 已提交
606 607
  pCliMsg->sent = 1;

dengyihao's avatar
dengyihao 已提交
608
  STransConnCtx* pCtx = pCliMsg->ctx;
dengyihao's avatar
dengyihao 已提交
609 610

  SCliThrdObj* pThrd = pConn->hostThrd;
dengyihao's avatar
formate  
dengyihao 已提交
611
  STrans*      pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
612

U
ubuntu 已提交
613
  STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
dengyihao's avatar
dengyihao 已提交
614 615 616 617
  if (pMsg->pCont == 0) {
    pMsg->pCont = (void*)rpcMallocCont(0);
    pMsg->contLen = 0;
  }
dengyihao's avatar
dengyihao 已提交
618
  int msgLen = transMsgLenFromCont(pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
619

dengyihao's avatar
dengyihao 已提交
620 621
  STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
  pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
dengyihao's avatar
dengyihao 已提交
622 623
  pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
  pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
624 625
  pHead->msgType = pMsg->msgType;
  pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
dengyihao's avatar
dengyihao 已提交
626
  pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
dengyihao's avatar
dengyihao 已提交
627
  memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
dengyihao's avatar
dengyihao 已提交
628 629

  uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
dengyihao's avatar
dengyihao 已提交
630 631 632
  tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
         TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
         taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
dengyihao's avatar
dengyihao 已提交
633

dengyihao's avatar
dengyihao 已提交
634 635 636
  if (pHead->persist == 1) {
    CONN_SET_PERSIST_BY_APP(pConn);
  }
dengyihao's avatar
dengyihao 已提交
637

dengyihao's avatar
dengyihao 已提交
638
  pConn->writeReq.data = pConn;
U
ubuntu 已提交
639
  uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
dengyihao's avatar
dengyihao 已提交
640

U
ubuntu 已提交
641 642
  return;
_RETURN:
dengyihao's avatar
dengyihao 已提交
643
  return;
dengyihao's avatar
dengyihao 已提交
644
}
U
ubuntu 已提交
645 646

void cliConnCb(uv_connect_t* req, int status) {
dengyihao's avatar
dengyihao 已提交
647
  // impl later
dengyihao's avatar
dengyihao 已提交
648 649
  SCliConn* pConn = req->data;
  if (status != 0) {
U
ubuntu 已提交
650 651
    tError("%s cli conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
    cliHandleExcept(pConn);
652
    return;
dengyihao's avatar
dengyihao 已提交
653
  }
dengyihao's avatar
dengyihao 已提交
654 655 656
  int addrlen = sizeof(pConn->addr);
  uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen);

dengyihao's avatar
dengyihao 已提交
657 658 659
  addrlen = sizeof(pConn->locaddr);
  uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);

U
ubuntu 已提交
660
  tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
dengyihao's avatar
dengyihao 已提交
661
  assert(pConn->stream == req->handle);
dengyihao's avatar
dengyihao 已提交
662

U
ubuntu 已提交
663
  cliSend(pConn);
dengyihao's avatar
dengyihao 已提交
664 665
}

U
ubuntu 已提交
666 667
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
  tDebug("cli work thread %p start to quit", pThrd);
dengyihao's avatar
dengyihao 已提交
668
  destroyCmsg(pMsg);
dengyihao's avatar
fix bug  
dengyihao 已提交
669
  destroyConnPool(pThrd->pool);
dengyihao's avatar
dengyihao 已提交
670
  uv_timer_stop(&pThrd->timer);
dengyihao's avatar
dengyihao 已提交
671 672
  uv_walk(pThrd->loop, cliWalkCb, NULL);

dengyihao's avatar
dengyihao 已提交
673
  pThrd->quit = true;
dengyihao's avatar
dengyihao 已提交
674

dengyihao's avatar
dengyihao 已提交
675
  // uv_stop(pThrd->loop);
dengyihao's avatar
dengyihao 已提交
676
}
dengyihao's avatar
dengyihao 已提交
677
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
S
Shengliang Guan 已提交
678
  SCliConn* conn = pMsg->msg.info.handle;
dengyihao's avatar
dengyihao 已提交
679
  tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
680

dengyihao's avatar
dengyihao 已提交
681 682
  if (T_REF_VAL_GET(conn) == 2) {
    transUnrefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
683 684
    if (!transQueuePush(&conn->cliMsgs, pMsg)) {
      return;
dengyihao's avatar
dengyihao 已提交
685 686 687 688 689
    }
    cliSend(conn);
  } else {
    // conn already broken down
    transUnrefCliHandle(conn);
dengyihao's avatar
dengyihao 已提交
690 691
  }
}
dengyihao's avatar
dengyihao 已提交
692 693 694 695 696 697 698 699 700
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) {
  STransConnCtx* pCtx = pMsg->ctx;

  pThrd->useDefaultEpSet = true;
  pThrd->defaultEpSet = pCtx->epSet;

  tsem_post(pCtx->pSem);
  destroyCmsg(pMsg);
}
U
ubuntu 已提交
701 702

SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
703
  SCliConn* conn = NULL;
S
Shengliang Guan 已提交
704 705
  if (pMsg->msg.info.handle != NULL) {
    conn = (SCliConn*)(pMsg->msg.info.handle);
dengyihao's avatar
dengyihao 已提交
706
    if (conn != NULL) {
U
ubuntu 已提交
707
      tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
708 709
    }
  } else {
dengyihao's avatar
dengyihao 已提交
710
    STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
dengyihao 已提交
711
    conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
dengyihao's avatar
dengyihao 已提交
712 713
    if (conn != NULL) {
      tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
dengyihao's avatar
dengyihao 已提交
714 715
    } else {
      tTrace("not found conn in conn pool %p", pThrd->pool);
dengyihao's avatar
dengyihao 已提交
716
    }
dengyihao's avatar
dengyihao 已提交
717
  }
dengyihao's avatar
dengyihao 已提交
718 719
  return conn;
}
U
ubuntu 已提交
720 721

void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
722 723
  uint64_t et = taosGetTimestampUs();
  uint64_t el = et - pMsg->st;
dengyihao's avatar
dengyihao 已提交
724 725
  tTrace("%s cli msg tran time cost: %" PRIu64 "us, threadID: %" PRId64 "", ((STrans*)pThrd->pTransInst)->label, el,
         pThrd->thread);
dengyihao's avatar
dengyihao 已提交
726 727

  STransConnCtx* pCtx = pMsg->ctx;
dengyihao's avatar
formate  
dengyihao 已提交
728
  STrans*        pTransInst = pThrd->pTransInst;
dengyihao's avatar
dengyihao 已提交
729

dengyihao's avatar
dengyihao 已提交
730 731 732 733
  if (pThrd->useDefaultEpSet) {
    pCtx->epSet = pThrd->defaultEpSet;
  }

U
ubuntu 已提交
734
  SCliConn* conn = cliGetConn(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
735
  if (conn != NULL) {
dengyihao's avatar
dengyihao 已提交
736
    conn->hThrdIdx = pCtx->hThrdIdx;
dengyihao's avatar
dengyihao 已提交
737

dengyihao's avatar
dengyihao 已提交
738
    transCtxMerge(&conn->ctx, &pCtx->appCtx);
dengyihao's avatar
dengyihao 已提交
739
    transQueuePush(&conn->cliMsgs, pMsg);
U
ubuntu 已提交
740
    cliSend(conn);
dengyihao's avatar
dengyihao 已提交
741
  } else {
U
ubuntu 已提交
742
    conn = cliCreateConn(pThrd);
dengyihao's avatar
dengyihao 已提交
743
    transCtxMerge(&conn->ctx, &pCtx->appCtx);
dengyihao's avatar
dengyihao 已提交
744
    transQueuePush(&conn->cliMsgs, pMsg);
dengyihao's avatar
dengyihao 已提交
745

dengyihao's avatar
dengyihao 已提交
746
    conn->hThrdIdx = pCtx->hThrdIdx;
dengyihao's avatar
dengyihao 已提交
747 748
    conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet));
    conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
dengyihao's avatar
dengyihao 已提交
749

dengyihao's avatar
dengyihao 已提交
750 751
    int ret = transSetConnOption((uv_tcp_t*)conn->stream);
    if (ret) {
U
ubuntu 已提交
752
      tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
dengyihao's avatar
dengyihao 已提交
753
    }
dengyihao's avatar
dengyihao 已提交
754 755 756 757 758 759 760
    int fd = taosCreateSocketWithTimeOutOpt(TRANS_CONN_TIMEOUT);
    if (fd == -1) {
      tTrace("%s cli conn %p failed to create socket", pTransInst->label, conn);
      cliHandleExcept(conn);
      return;
    }
    uv_tcp_open((uv_tcp_t*)conn->stream, fd);
761

dengyihao's avatar
dengyihao 已提交
762
    struct sockaddr_in addr;
763 764 765
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip);
    addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
dengyihao's avatar
dengyihao 已提交
766 767 768 769 770 771 772 773
    tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
    ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
    if (ret != 0) {
      tTrace("%s cli conn %p failed to connect to %s:%d, reason: %s", pTransInst->label, conn, conn->ip, conn->port,
             uv_err_name(ret));
      cliHandleExcept(conn);
      return;
    }
dengyihao's avatar
dengyihao 已提交
774 775
  }
}
U
ubuntu 已提交
776
static void cliAsyncCb(uv_async_t* handle) {
dengyihao's avatar
dengyihao 已提交
777 778
  SAsyncItem*  item = handle->data;
  SCliThrdObj* pThrd = item->pThrd;
dengyihao's avatar
dengyihao 已提交
779
  SCliMsg*     pMsg = NULL;
dengyihao's avatar
dengyihao 已提交
780

dengyihao's avatar
dengyihao 已提交
781
  // batch process to avoid to lock/unlock frequently
dengyihao's avatar
formate  
dengyihao 已提交
782
  queue wq;
wafwerar's avatar
wafwerar 已提交
783
  taosThreadMutexLock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
784
  QUEUE_MOVE(&item->qmsg, &wq);
wafwerar's avatar
wafwerar 已提交
785
  taosThreadMutexUnlock(&item->mtx);
dengyihao's avatar
dengyihao 已提交
786

dengyihao's avatar
dengyihao 已提交
787 788 789 790
  int count = 0;
  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
dengyihao's avatar
dengyihao 已提交
791 792

    SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
dengyihao's avatar
dengyihao 已提交
793 794
    if (pMsg == NULL) {
      continue;
dengyihao's avatar
dengyihao 已提交
795
    }
dengyihao's avatar
dengyihao 已提交
796
    (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
dengyihao's avatar
dengyihao 已提交
797
    count++;
dengyihao's avatar
dengyihao 已提交
798 799
  }
  if (count >= 2) {
U
ubuntu 已提交
800
    tTrace("cli process batch size: %d", count);
dengyihao's avatar
dengyihao 已提交
801
  }
dengyihao's avatar
dengyihao 已提交
802 803
}

U
ubuntu 已提交
804
static void* cliWorkThread(void* arg) {
dengyihao's avatar
dengyihao 已提交
805
  SCliThrdObj* pThrd = (SCliThrdObj*)arg;
U
ubuntu 已提交
806
  setThreadName("trans-cli-work");
dengyihao's avatar
dengyihao 已提交
807
  uv_run(pThrd->loop, UV_RUN_DEFAULT);
808
  return NULL;
dengyihao's avatar
dengyihao 已提交
809 810
}

U
ubuntu 已提交
811
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
wafwerar's avatar
wafwerar 已提交
812
  SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj));
dengyihao's avatar
dengyihao 已提交
813

dengyihao's avatar
dengyihao 已提交
814
  STrans* pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
815 816
  memcpy(cli->label, label, strlen(label));
  cli->numOfThreads = numOfThreads;
wafwerar's avatar
wafwerar 已提交
817
  cli->pThreadObj = (SCliThrdObj**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrdObj*));
dengyihao's avatar
dengyihao 已提交
818 819

  for (int i = 0; i < cli->numOfThreads; i++) {
dengyihao's avatar
dengyihao 已提交
820
    SCliThrdObj* pThrd = createThrdObj();
dengyihao's avatar
dengyihao 已提交
821
    pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
dengyihao's avatar
dengyihao 已提交
822
    pThrd->pTransInst = shandle;
dengyihao's avatar
dengyihao 已提交
823

wafwerar's avatar
wafwerar 已提交
824
    int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
dengyihao's avatar
dengyihao 已提交
825
    if (err == 0) {
U
ubuntu 已提交
826
      tDebug("success to create tranport-cli thread %d", i);
dengyihao's avatar
dengyihao 已提交
827
    }
dengyihao's avatar
dengyihao 已提交
828
    cli->pThreadObj[i] = pThrd;
dengyihao's avatar
dengyihao 已提交
829 830 831
  }
  return cli;
}
dengyihao's avatar
dengyihao 已提交
832

U
ubuntu 已提交
833
static void destroyUserdata(STransMsg* userdata) {
dengyihao's avatar
dengyihao 已提交
834 835 836 837 838 839 840 841 842 843 844 845
  if (userdata->pCont == NULL) {
    return;
  }
  transFreeMsg(userdata->pCont);
  userdata->pCont = NULL;
}
static void destroyCmsg(SCliMsg* pMsg) {
  if (pMsg == NULL) {
    return;
  }
  transDestroyConnCtx(pMsg->ctx);
  destroyUserdata(&pMsg->msg);
wafwerar's avatar
wafwerar 已提交
846
  taosMemoryFree(pMsg);
dengyihao's avatar
dengyihao 已提交
847
}
dengyihao's avatar
dengyihao 已提交
848

dengyihao's avatar
dengyihao 已提交
849
static SCliThrdObj* createThrdObj() {
wafwerar's avatar
wafwerar 已提交
850
  SCliThrdObj* pThrd = (SCliThrdObj*)taosMemoryCalloc(1, sizeof(SCliThrdObj));
U
ubuntu 已提交
851

dengyihao's avatar
dengyihao 已提交
852
  QUEUE_INIT(&pThrd->msg);
wafwerar's avatar
wafwerar 已提交
853
  taosThreadMutexInit(&pThrd->msgMtx, NULL);
dengyihao's avatar
dengyihao 已提交
854

wafwerar's avatar
wafwerar 已提交
855
  pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
dengyihao's avatar
dengyihao 已提交
856 857
  uv_loop_init(pThrd->loop);

U
ubuntu 已提交
858
  pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb);
dengyihao's avatar
dengyihao 已提交
859 860
  uv_timer_init(pThrd->loop, &pThrd->timer);
  pThrd->timer.data = pThrd;
dengyihao's avatar
dengyihao 已提交
861

dengyihao's avatar
dengyihao 已提交
862
  pThrd->pool = createConnPool(4);
dengyihao's avatar
dengyihao 已提交
863

dengyihao's avatar
dengyihao 已提交
864
  transDQCreate(pThrd->loop, &pThrd->delayQueue);
dengyihao's avatar
dengyihao 已提交
865

dengyihao's avatar
dengyihao 已提交
866
  pThrd->quit = false;
dengyihao's avatar
dengyihao 已提交
867 868
  return pThrd;
}
dengyihao's avatar
dengyihao 已提交
869
static void destroyThrdObj(SCliThrdObj* pThrd) {
dengyihao's avatar
dengyihao 已提交
870 871 872
  if (pThrd == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
873

wafwerar's avatar
wafwerar 已提交
874
  taosThreadJoin(pThrd->thread, NULL);
dengyihao's avatar
dengyihao 已提交
875
  CLI_RELEASE_UV(pThrd->loop);
wafwerar's avatar
wafwerar 已提交
876
  taosThreadMutexDestroy(&pThrd->msgMtx);
dengyihao's avatar
dengyihao 已提交
877
  transDestroyAsyncPool(pThrd->asyncPool);
dengyihao's avatar
dengyihao 已提交
878

dengyihao's avatar
dengyihao 已提交
879
  transDQDestroy(pThrd->delayQueue);
wafwerar's avatar
wafwerar 已提交
880 881
  taosMemoryFree(pThrd->loop);
  taosMemoryFree(pThrd);
dengyihao's avatar
dengyihao 已提交
882
}
dengyihao's avatar
dengyihao 已提交
883

dengyihao's avatar
dengyihao 已提交
884
static void transDestroyConnCtx(STransConnCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
885
  //
wafwerar's avatar
wafwerar 已提交
886
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
887
}
dengyihao's avatar
dengyihao 已提交
888

U
ubuntu 已提交
889
void cliSendQuit(SCliThrdObj* thrd) {
dengyihao's avatar
dengyihao 已提交
890
  // cli can stop gracefully
wafwerar's avatar
wafwerar 已提交
891
  SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
892
  msg->type = Quit;
dengyihao's avatar
dengyihao 已提交
893
  transSendAsync(thrd->asyncPool, &msg->q);
dengyihao's avatar
dengyihao 已提交
894
}
dengyihao's avatar
dengyihao 已提交
895 896 897 898 899
void cliWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}
dengyihao's avatar
dengyihao 已提交
900

U
ubuntu 已提交
901 902 903 904 905 906 907
int cliRBChoseIdx(STrans* pTransInst) {
  int64_t index = pTransInst->index;
  if (pTransInst->index++ >= pTransInst->numOfThreads) {
    pTransInst->index = 0;
  }
  return index % pTransInst->numOfThreads;
}
dengyihao's avatar
dengyihao 已提交
908 909 910 911 912 913 914 915 916
static void doDelayTask(void* param) {
  STaskArg* arg = param;

  SCliMsg*     pMsg = arg->param1;
  SCliThrdObj* pThrd = arg->param2;
  cliHandleReq(pMsg, pThrd);

  taosMemoryFree(arg);
}
dengyihao's avatar
dengyihao 已提交
917
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
dengyihao's avatar
dengyihao 已提交
918 919 920
  SCliThrdObj* pThrd = pConn->hostThrd;
  STrans*      pTransInst = pThrd->pTransInst;

dengyihao's avatar
dengyihao 已提交
921 922 923 924 925 926 927 928
  if (pMsg == NULL || pMsg->ctx == NULL) {
    tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
    pTransInst->cfp(pTransInst->parent, pResp, NULL);
    return 0;
  }

  STransConnCtx* pCtx = pMsg->ctx;
  SEpSet*        pEpSet = &pCtx->epSet;
dengyihao's avatar
dengyihao 已提交
929 930 931
  /*
   * upper layer handle retry if code equal TSDB_CODE_RPC_NETWORK_UNAVAIL
   */
dengyihao's avatar
dengyihao 已提交
932 933
  tmsg_t msgType = pCtx->msgType;
  if ((pTransInst->retry != NULL && (pTransInst->retry(pResp->code))) ||
dengyihao's avatar
dengyihao 已提交
934
      ((pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && msgType == TDMT_MND_CONNECT)) {
dengyihao's avatar
dengyihao 已提交
935
    pMsg->sent = 0;
dengyihao's avatar
dengyihao 已提交
936
    pMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
937
    pCtx->retryCount += 1;
dengyihao's avatar
dengyihao 已提交
938
    if (msgType == TDMT_MND_CONNECT && pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
dengyihao's avatar
dengyihao 已提交
939 940
      if (pCtx->retryCount < pEpSet->numOfEps) {
        pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
941 942 943 944

        STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
        arg->param1 = pMsg;
        arg->param2 = pThrd;
dengyihao's avatar
dengyihao 已提交
945
        transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
dengyihao's avatar
dengyihao 已提交
946
        cliDestroyConn(pConn, true);
dengyihao's avatar
dengyihao 已提交
947 948 949 950
        return -1;
      }
    } else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
      if (pResp->contLen == 0) {
dengyihao's avatar
dengyihao 已提交
951
        pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
dengyihao's avatar
dengyihao 已提交
952 953 954 955 956
      } else {
        SMEpSet emsg = {0};
        tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg);
        pCtx->epSet = emsg.epSet;
      }
dengyihao's avatar
dengyihao 已提交
957
      addConnToPool(pThrd->pool, pConn);
dengyihao's avatar
dengyihao 已提交
958
      tTrace("use remote epset, current in use: %d, retry count:%d, try limit: %d", pEpSet->inUse, pCtx->retryCount + 1,
dengyihao's avatar
dengyihao 已提交
959 960
             TRANS_RETRY_COUNT_LIMIT);

dengyihao's avatar
dengyihao 已提交
961 962 963
      STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
      arg->param1 = pMsg;
      arg->param2 = pThrd;
dengyihao's avatar
dengyihao 已提交
964
      transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
dengyihao's avatar
dengyihao 已提交
965
      return -1;
dengyihao's avatar
dengyihao 已提交
966 967 968 969
    }
  }

  if (pCtx->pSem != NULL) {
970 971 972 973 974 975
    tTrace("%s cli conn %p(sync) handle resp", pTransInst->label, pConn);
    if (pCtx->pRsp == NULL) {
      tTrace("%s cli conn %p(sync) failed to resp, ignore", pTransInst->label, pConn);
    } else {
      memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
    }
dengyihao's avatar
dengyihao 已提交
976
    tsem_post(pCtx->pSem);
977
    pCtx->pRsp = NULL;
dengyihao's avatar
dengyihao 已提交
978
  } else {
dengyihao's avatar
dengyihao 已提交
979 980
    tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
    pTransInst->cfp(pTransInst->parent, pResp, pEpSet);
dengyihao's avatar
dengyihao 已提交
981
  }
dengyihao's avatar
dengyihao 已提交
982
  return 0;
dengyihao's avatar
dengyihao 已提交
983
}
U
ubuntu 已提交
984 985

void transCloseClient(void* arg) {
U
ubuntu 已提交
986
  SCliObj* cli = arg;
dengyihao's avatar
dengyihao 已提交
987
  for (int i = 0; i < cli->numOfThreads; i++) {
U
ubuntu 已提交
988
    cliSendQuit(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
989
    destroyThrdObj(cli->pThreadObj[i]);
dengyihao's avatar
dengyihao 已提交
990
  }
wafwerar's avatar
wafwerar 已提交
991 992
  taosMemoryFree(cli->pThreadObj);
  taosMemoryFree(cli);
dengyihao's avatar
dengyihao 已提交
993
}
dengyihao's avatar
dengyihao 已提交
994 995 996 997 998 999 1000 1001 1002 1003 1004 1005
void transRefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_INC((SCliConn*)handle);
  UNUSED(ref);
}
void transUnrefCliHandle(void* handle) {
  if (handle == NULL) {
    return;
  }
  int ref = T_REF_DEC((SCliConn*)handle);
dengyihao's avatar
dengyihao 已提交
1006
  tDebug("%s cli conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
dengyihao's avatar
dengyihao 已提交
1007
  if (ref == 0) {
U
ubuntu 已提交
1008
    cliDestroyConn((SCliConn*)handle, true);
dengyihao's avatar
dengyihao 已提交
1009 1010
  }
}
dengyihao's avatar
dengyihao 已提交
1011 1012 1013 1014 1015 1016
void transReleaseCliHandle(void* handle) {
  SCliThrdObj* thrd = CONN_GET_HOST_THREAD(handle);
  if (thrd == NULL) {
    return;
  }

S
Shengliang Guan 已提交
1017
  STransMsg tmsg = {.info.handle = handle};
wafwerar's avatar
wafwerar 已提交
1018
  SCliMsg*  cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
1019
  cmsg->msg = tmsg;
dengyihao's avatar
dengyihao 已提交
1020
  cmsg->type = Release;
dengyihao's avatar
dengyihao 已提交
1021 1022 1023

  transSendAsync(thrd->asyncPool, &cmsg->q);
}
dengyihao's avatar
dengyihao 已提交
1024

dengyihao's avatar
dengyihao 已提交
1025
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
U
ubuntu 已提交
1026
  STrans* pTransInst = (STrans*)shandle;
S
Shengliang Guan 已提交
1027
  int     index = CONN_HOST_THREAD_INDEX((SCliConn*)pReq->info.handle);
dengyihao's avatar
dengyihao 已提交
1028
  if (index == -1) {
U
ubuntu 已提交
1029
    index = cliRBChoseIdx(pTransInst);
dengyihao's avatar
dengyihao 已提交
1030
  }
dengyihao's avatar
dengyihao 已提交
1031

wafwerar's avatar
wafwerar 已提交
1032
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
1033
  pCtx->epSet = *pEpSet;
S
Shengliang Guan 已提交
1034
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
1035
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
1036
  pCtx->hThrdIdx = index;
dengyihao's avatar
dengyihao 已提交
1037 1038 1039 1040

  if (ctx != NULL) {
    pCtx->appCtx = *ctx;
  }
dengyihao's avatar
dengyihao 已提交
1041
  assert(pTransInst->connType == TAOS_CONN_CLIENT);
dengyihao's avatar
dengyihao 已提交
1042

wafwerar's avatar
wafwerar 已提交
1043
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
1044
  cliMsg->ctx = pCtx;
dengyihao's avatar
dengyihao 已提交
1045
  cliMsg->msg = *pReq;
dengyihao's avatar
dengyihao 已提交
1046
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
1047
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
1048

U
ubuntu 已提交
1049
  SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
dengyihao's avatar
dengyihao 已提交
1050

dengyihao's avatar
dengyihao 已提交
1051
  tDebug("send request at thread:%d, threadID: %" PRId64 ",  msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq,
S
Shengliang Guan 已提交
1052
         EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
L
Liu Jicong 已提交
1053
  ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0);
dengyihao's avatar
dengyihao 已提交
1054
}
dengyihao's avatar
dengyihao 已提交
1055

dengyihao's avatar
dengyihao 已提交
1056
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
U
ubuntu 已提交
1057
  STrans* pTransInst = (STrans*)shandle;
S
Shengliang Guan 已提交
1058
  int     index = CONN_HOST_THREAD_INDEX(pReq->info.handle);
dengyihao's avatar
dengyihao 已提交
1059
  if (index == -1) {
U
ubuntu 已提交
1060
    index = cliRBChoseIdx(pTransInst);
dengyihao's avatar
dengyihao 已提交
1061 1062
  }

wafwerar's avatar
wafwerar 已提交
1063
  STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
dengyihao's avatar
dengyihao 已提交
1064
  pCtx->epSet = *pEpSet;
S
Shengliang Guan 已提交
1065
  pCtx->ahandle = pReq->info.ahandle;
dengyihao's avatar
dengyihao 已提交
1066
  pCtx->msgType = pReq->msgType;
dengyihao's avatar
dengyihao 已提交
1067
  pCtx->hThrdIdx = index;
wafwerar's avatar
wafwerar 已提交
1068
  pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t));
dengyihao's avatar
dengyihao 已提交
1069 1070 1071
  pCtx->pRsp = pRsp;
  tsem_init(pCtx->pSem, 0, 0);

wafwerar's avatar
wafwerar 已提交
1072
  SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
dengyihao's avatar
dengyihao 已提交
1073 1074 1075
  cliMsg->ctx = pCtx;
  cliMsg->msg = *pReq;
  cliMsg->st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
1076
  cliMsg->type = Normal;
dengyihao's avatar
dengyihao 已提交
1077

U
ubuntu 已提交
1078
  SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
dengyihao's avatar
dengyihao 已提交
1079
  tDebug("send request at thread:%d, threadID:%" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq,
S
Shengliang Guan 已提交
1080
         EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
dengyihao's avatar
dengyihao 已提交
1081

dengyihao's avatar
dengyihao 已提交
1082 1083 1084 1085
  transSendAsync(thrd->asyncPool, &(cliMsg->q));
  tsem_t* pSem = pCtx->pSem;
  tsem_wait(pSem);
  tsem_destroy(pSem);
wafwerar's avatar
wafwerar 已提交
1086
  taosMemoryFree(pSem);
dengyihao's avatar
dengyihao 已提交
1087
}
U
ubuntu 已提交
1088

dengyihao's avatar
dengyihao 已提交
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
void transSetDefaultEpSet(void* ahandle, const SEpSet* dst) {
  STrans* pTransInst = ahandle;
  for (int i = 0; i < pTransInst->numOfThreads; i++) {
    STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
    pCtx->hThrdIdx = i;
    pCtx->epSet = *dst;
    pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t));
    tsem_init(pCtx->pSem, 0, 0);

    SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
    cliMsg->ctx = pCtx;
    cliMsg->type = Update;

    SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
    tDebug("send update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread);

    tsem_t* pSem = pCtx->pSem;
    transSendAsync(thrd->asyncPool, &(cliMsg->q));

    tsem_wait(pSem);
    tsem_destroy(pSem);
    taosMemoryFree(pSem);
  }
}
dengyihao's avatar
dengyihao 已提交
1113
#endif