rpcMain.c 55.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "lz4.h"
wafwerar's avatar
wafwerar 已提交
17
#include "transportInt.h"
S
slguan 已提交
18
#include "os.h"
dengyihao's avatar
dengyihao 已提交
19 20 21 22 23 24 25 26
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
H
hzcheng 已提交
27 28 29
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
H
Hongze Cheng 已提交
30
#include "tmsg.h"
dengyihao's avatar
dengyihao 已提交
31
#include "tref.h"
S
slguan 已提交
32
#include "trpc.h"
dengyihao's avatar
dengyihao 已提交
33 34 35
#include "ttimer.h"
#include "tutil.h"

wafwerar's avatar
wafwerar 已提交
36
static TdThreadOnce tsRpcInitOnce = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
37 38 39 40 41 42 43 44

int tsRpcMaxUdpSize = 15000;  // bytes
int tsProgressTimer = 100;
// not configurable
int tsRpcMaxRetry;
int tsRpcHeadSize;
int tsRpcOverhead;

S
Shengliang 已提交
45 46
SHashObj *tsFqdnHash;

dengyihao's avatar
dengyihao 已提交
47 48
#ifndef USE_UV

dengyihao's avatar
dengyihao 已提交
49 50 51 52 53 54 55 56
typedef struct {
  int      sessions;      // number of sessions allowed
  int      numOfThreads;  // number of threads to process incoming messages
  int      idleTime;      // milliseconds;
  uint16_t localPort;
  int8_t   connType;
  int      index;  // for UDP server only, round robin for multiple threads
  char     label[TSDB_LABEL_LEN];
dengyihao's avatar
dengyihao 已提交
57

dengyihao's avatar
dengyihao 已提交
58 59 60 61 62
  char user[TSDB_UNI_LEN];         // meter ID
  char spi;                        // security parameter index
  char encrypt;                    // encrypt algorithm
  char secret[TSDB_PASSWORD_LEN];  // secret for the link
  char ckey[TSDB_PASSWORD_LEN];    // ciphering key
dengyihao's avatar
dengyihao 已提交
63

dengyihao's avatar
dengyihao 已提交
64 65
  void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
  int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
dengyihao's avatar
dengyihao 已提交
66

dengyihao's avatar
dengyihao 已提交
67
  int32_t          refCount;
dengyihao's avatar
dengyihao 已提交
68 69 70 71 72 73 74
  void *           parent;
  void *           idPool;     // handle to ID pool
  void *           tmrCtrl;    // handle to timer
  SHashObj *       hash;       // handle returned by hash utility
  void *           tcphandle;  // returned handle from TCP initialization
  void *           udphandle;  // returned handle from UDP initialization
  void *           pCache;     // connection cache
wafwerar's avatar
wafwerar 已提交
75
  TdThreadMutex  mutex;
dengyihao's avatar
dengyihao 已提交
76
  struct SRpcConn *connList;  // connection list
dengyihao's avatar
dengyihao 已提交
77
} SRpcInfo;
dengyihao's avatar
dengyihao 已提交
78

dengyihao's avatar
dengyihao 已提交
79
typedef struct {
dengyihao's avatar
dengyihao 已提交
80
  SRpcInfo *       pRpc;      // associated SRpcInfo
dengyihao's avatar
dengyihao 已提交
81
  SEpSet           epSet;     // ip list provided by app
dengyihao's avatar
dengyihao 已提交
82 83
  void *           ahandle;   // handle provided by app
  struct SRpcConn *pConn;     // pConn allocated
dengyihao's avatar
dengyihao 已提交
84
  tmsg_t           msgType;   // message type
dengyihao's avatar
dengyihao 已提交
85
  uint8_t *        pCont;     // content provided by app
dengyihao's avatar
dengyihao 已提交
86 87 88 89 90 91 92
  int32_t          contLen;   // content length
  int32_t          code;      // error code
  int16_t          numOfTry;  // number of try for different servers
  int8_t           oldInUse;  // server EP inUse passed by app
  int8_t           redirect;  // flag to indicate redirect
  int8_t           connType;  // connection type
  int64_t          rid;       // refId returned by taosAddRef
dengyihao's avatar
dengyihao 已提交
93 94 95
  SRpcMsg *        pRsp;      // for synchronous API
  tsem_t *         pSem;      // for synchronous API
  SEpSet *         pSet;      // for synchronous API
dengyihao's avatar
dengyihao 已提交
96 97 98
  char             msg[0];    // RpcHead starts from here
} SRpcReqContext;

dengyihao's avatar
dengyihao 已提交
99
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
dengyihao's avatar
dengyihao 已提交
100
#define rpcHeadFromCont(cont) ((SRpcHead *)((char *)cont - sizeof(SRpcHead)))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101 102 103
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104
#define rpcIsReq(type) (type & 1U)
H
hzcheng 已提交
105

J
Jeff Tao 已提交
106
typedef struct SRpcConn {
dengyihao's avatar
dengyihao 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
  char            info[48];                   // debug info: label + pConn + ahandle
  int             sid;                        // session ID
  uint32_t        ownId;                      // own link ID
  uint32_t        peerId;                     // peer link ID
  char            user[TSDB_UNI_LEN];         // user ID for the link
  char            spi;                        // security parameter index
  char            encrypt;                    // encryption, 0:1
  char            secret[TSDB_PASSWORD_LEN];  // secret for the link
  char            ckey[TSDB_PASSWORD_LEN];    // ciphering key
  char            secured;                    // if set to 1, no authentication
  uint16_t        localPort;                  // for UDP only
  uint32_t        linkUid;                    // connection unique ID assigned by client
  uint32_t        peerIp;                     // peer IP
  uint16_t        peerPort;                   // peer port
  char            peerFqdn[TSDB_FQDN_LEN];    // peer FQDN or ip string
  uint16_t        tranId;                     // outgoing transcation ID, for build message
  uint16_t        outTranId;                  // outgoing transcation ID
  uint16_t        inTranId;                   // transcation ID for incoming msg
  tmsg_t          outType;                    // message type for outgoing request
  tmsg_t          inType;                     // message type for incoming request
  void *          chandle;                    // handle passed by TCP/UDP connection layer
  void *          ahandle;                    // handle provided by upper app layter
  int             retry;                      // number of retry for sending request
  int             tretry;                     // total retry
  void *          pTimer;                     // retry timer to monitor the response
  void *          pIdleTimer;                 // idle timer
  char *          pRspMsg;                    // response message including header
  int             rspMsgLen;                  // response messag length
  char *          pReqMsg;                    // request message including header
  int             reqMsgLen;                  // request message length
  SRpcInfo *      pRpc;                       // the associated SRpcInfo
  int8_t          connType;                   // connection type
  int64_t         lockedBy;                   // lock for connection
  SRpcReqContext *pContext;                   // request context
H
hzcheng 已提交
141 142
} SRpcConn;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
143 144
static int     tsRpcRefId = -1;
static int32_t tsRpcNum = 0;
S
Shengliang Guan 已提交
145

wafwerar's avatar
wafwerar 已提交
146
// static TdThreadOnce tsRpcInit = PTHREAD_ONCE_INIT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
147

148
// server:0 client:1  tcp:2 udp:0
dengyihao's avatar
dengyihao 已提交
149 150 151 152
#define RPC_CONN_UDPS 0
#define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3
153

J
jtao1735 已提交
154
void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
dengyihao's avatar
dengyihao 已提交
155
    taosInitUdpConnection, taosInitUdpConnection, taosInitTcpServer, taosInitTcpClient};
H
hzcheng 已提交
156

dengyihao's avatar
dengyihao 已提交
157 158
void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer,
                                            taosCleanUpTcpClient};
H
hzcheng 已提交
159

160
void (*taosStopConn[])(void *thandle) = {
dengyihao's avatar
dengyihao 已提交
161 162
    taosStopUdpConnection,
    taosStopUdpConnection,
163
    taosStopTcpServer,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
164
    taosStopTcpClient,
165 166
};

167
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
dengyihao's avatar
dengyihao 已提交
168
    taosSendUdpData, taosSendUdpData, taosSendTcpData, taosSendTcpData};
H
hzcheng 已提交
169

J
jtao1735 已提交
170
void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = {
S
slguan 已提交
171 172 173 174
    taosOpenUdpConnection,
    taosOpenUdpConnection,
    NULL,
    taosOpenTcpClientConnection,
H
hzcheng 已提交
175 176
};

dengyihao's avatar
dengyihao 已提交
177
void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpConnection, taosCloseTcpConnection};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178

J
jtao1735 已提交
179
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
180
static void      rpcCloseConn(void *thandle);
181
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext);
182
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
183 184
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
H
hzcheng 已提交
185

dengyihao's avatar
dengyihao 已提交
186 187 188 189 190
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void rpcSendReqHead(SRpcConn *pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
191

192
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
S
slguan 已提交
193
static void  rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext);
194 195 196
static void  rpcProcessConnError(void *param, void *id);
static void  rpcProcessRetryTimer(void *, void *);
static void  rpcProcessIdleTimer(void *param, void *tmrId);
197
static void  rpcProcessProgressTimer(void *param, void *tmrId);
H
hzcheng 已提交
198

dengyihao's avatar
dengyihao 已提交
199 200
static void      rpcFreeMsg(void *msg);
static int32_t   rpcCompressRpcMsg(char *pCont, int32_t contLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
dengyihao's avatar
dengyihao 已提交
202 203 204 205 206 207
static int       rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
static int       rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
static void      rpcLockConn(SRpcConn *pConn);
static void      rpcUnlockConn(SRpcConn *pConn);
static void      rpcAddRef(SRpcInfo *pRpc);
static void      rpcDecRef(SRpcInfo *pRpc);
H
hzcheng 已提交
208

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
209 210
static void rpcFree(void *p) {
  tTrace("free mem: %p", p);
wafwerar's avatar
wafwerar 已提交
211
  taosMemoryFree(p);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
212 213
}

214 215 216 217
static void rpcInitImp(void) {
  tsProgressTimer = tsRpcTimer / 2;
  tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsProgressTimer;
  tsRpcHeadSize = RPC_MSG_OVERHEAD;
218
  tsRpcOverhead = sizeof(SRpcReqContext);
H
hzcheng 已提交
219

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
220
  tsRpcRefId = taosOpenRef(200, rpcFree);
S
Shengliang 已提交
221 222

  tsFqdnHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
223
}
224

S
Shengliang Guan 已提交
225
int32_t rpcInit() {
wafwerar's avatar
wafwerar 已提交
226
  taosThreadOnce(&tsRpcInitOnce, rpcInitImp);
227
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
228
}
229

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
230 231
void rpcCleanup(void) {
  taosCloseRef(tsRpcRefId);
S
Shengliang 已提交
232 233 234
  taosHashClear(tsFqdnHash);
  taosHashCleanup(tsFqdnHash);
  tsFqdnHash = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
235 236
  tsRpcRefId = -1;
}
dengyihao's avatar
dengyihao 已提交
237

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
238 239 240
void *rpcOpen(const SRpcInit *pInit) {
  SRpcInfo *pRpc;

wafwerar's avatar
wafwerar 已提交
241
  // taosThreadOnce(&tsRpcInit, rpcInit);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
242

wafwerar's avatar
wafwerar 已提交
243
  pRpc = (SRpcInfo *)taosMemoryCalloc(1, sizeof(SRpcInfo));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244
  if (pRpc == NULL) return NULL;
H
hzcheng 已提交
245

H
Haojun Liao 已提交
246
  if (pInit->label) tstrncpy(pRpc->label, pInit->label, tListLen(pInit->label));
dengyihao's avatar
dengyihao 已提交
247

248
  pRpc->connType = pInit->connType;
Y
TD-3115  
yihaoDeng 已提交
249 250
  if (pRpc->connType == TAOS_CONN_CLIENT) {
    pRpc->numOfThreads = pInit->numOfThreads;
dengyihao's avatar
dengyihao 已提交
251 252 253
    if (pRpc->numOfThreads >= 10) {
      pRpc->numOfThreads = 10;
    }
Y
TD-3115  
yihaoDeng 已提交
254
  } else {
dengyihao's avatar
dengyihao 已提交
255
    pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
Y
TD-3115  
yihaoDeng 已提交
256
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
257 258 259
  pRpc->idleTime = pInit->idleTime;
  pRpc->localPort = pInit->localPort;
  pRpc->afp = pInit->afp;
dengyihao's avatar
dengyihao 已提交
260
  pRpc->sessions = pInit->sessions + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
261 262 263
  if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
  if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret));
  if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
264
  pRpc->spi = pInit->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
265
  pRpc->cfp = pInit->cfp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
266
  pRpc->afp = pInit->afp;
S
Shengliang Guan 已提交
267
  pRpc->parent = pInit->parent;
268
  pRpc->refCount = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
269

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
270 271
  atomic_add_fetch_32(&tsRpcNum, 1);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
272
  size_t size = sizeof(SRpcConn) * pRpc->sessions;
wafwerar's avatar
wafwerar 已提交
273
  pRpc->connList = (SRpcConn *)taosMemoryCalloc(1, size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
274
  if (pRpc->connList == NULL) {
S
TD-1530  
Shengliang Guan 已提交
275
    tError("%s failed to allocate memory for taos connections, size:%" PRId64, pRpc->label, (int64_t)size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
277 278
    return NULL;
  }
H
hzcheng 已提交
279

dengyihao's avatar
dengyihao 已提交
280
  pRpc->idPool = taosInitIdPool(pRpc->sessions - 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
281 282
  if (pRpc->idPool == NULL) {
    tError("%s failed to init ID pool", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
284
    return NULL;
H
hzcheng 已提交
285
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
286

dengyihao's avatar
dengyihao 已提交
287
  pRpc->tmrCtrl = taosTmrInit(pRpc->sessions * 2 + 1, 50, 10000, pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
288 289
  if (pRpc->tmrCtrl == NULL) {
    tError("%s failed to init timers", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
290
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
291 292
    return NULL;
  }
H
hzcheng 已提交
293

294
  if (pRpc->connType == TAOS_CONN_SERVER) {
H
Haojun Liao 已提交
295
    pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true);
296 297 298 299 300 301
    if (pRpc->hash == NULL) {
      tError("%s failed to init string hash", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
  } else {
dengyihao's avatar
dengyihao 已提交
302 303
    pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20);
    if (pRpc->pCache == NULL) {
304 305 306 307
      tError("%s failed to init connection cache", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
308
  }
H
hzcheng 已提交
309

wafwerar's avatar
wafwerar 已提交
310
  taosThreadMutexInit(&pRpc->mutex, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
311

dengyihao's avatar
dengyihao 已提交
312 313 314 315
  pRpc->tcphandle = (*taosInitConn[pRpc->connType | RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads,
                                                                   rpcProcessMsgFromPeer, pRpc);
  pRpc->udphandle =
      (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
316 317 318 319 320 321 322

  if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) {
    tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort);
    rpcClose(pRpc);
    return NULL;
  }

S
TD-1843  
Shengliang Guan 已提交
323
  tDebug("%s rpc is opened, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions);
H
hzcheng 已提交
324

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
325
  return pRpc;
H
hzcheng 已提交
326 327
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
328 329
void rpcClose(void *param) {
  SRpcInfo *pRpc = (SRpcInfo *)param;
H
hzcheng 已提交
330

331
  // stop connection to outside first
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
332 333
  (*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
  (*taosStopConn[pRpc->connType])(pRpc->udphandle);
334

dengyihao's avatar
dengyihao 已提交
335
  // close all connections
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
336
  for (int i = 0; i < pRpc->sessions; ++i) {
337
    if (pRpc->connList && pRpc->connList[i].user[0]) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
338
      rpcCloseConn((void *)(pRpc->connList + i));
H
hzcheng 已提交
339 340 341
    }
  }

342
  // clean up
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
343 344
  (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
  (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
J
Jeff Tao 已提交
345

346
  tDebug("%s rpc is closed", pRpc->label);
347
  rpcDecRef(pRpc);
H
hzcheng 已提交
348 349
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
350 351
void *rpcMallocCont(int contLen) {
  int size = contLen + RPC_MSG_OVERHEAD;
352

wafwerar's avatar
wafwerar 已提交
353
  char *start = (char *)taosMemoryCalloc(1, (size_t)size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
354
  if (start == NULL) {
355 356
    tError("failed to malloc msg, size:%d", size);
    return NULL;
357
  } else {
S
TD-1762  
Shengliang Guan 已提交
358
    tTrace("malloc mem:%p size:%d", start, size);
359 360
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
361
  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
362 363 364
}

void rpcFreeCont(void *cont) {
S
Shengliang Guan 已提交
365
  if (cont) {
366
    char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext);
wafwerar's avatar
wafwerar 已提交
367
    taosMemoryFree(temp);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
368
    tTrace("free mem: %p", temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
369
  }
370 371
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
372 373 374 375
void *rpcReallocCont(void *ptr, int contLen) {
  if (ptr == NULL) return rpcMallocCont(contLen);

  char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead);
dengyihao's avatar
dengyihao 已提交
376
  if (contLen == 0) {
wafwerar's avatar
wafwerar 已提交
377
    taosMemoryFree(start);
J
Jeff Tao 已提交
378
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
379 380 381
  }

  int size = contLen + RPC_MSG_OVERHEAD;
wafwerar's avatar
wafwerar 已提交
382
  start = taosMemoryRealloc(start, size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
383 384 385
  if (start == NULL) {
    tError("failed to realloc cont, size:%d", size);
    return NULL;
dengyihao's avatar
dengyihao 已提交
386
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
387 388 389 390

  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}

S
Shengliang Guan 已提交
391
void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
dengyihao's avatar
dengyihao 已提交
392
  SRpcInfo *      pRpc = (SRpcInfo *)shandle;
393 394
  SRpcReqContext *pContext;

J
Jeff Tao 已提交
395
  int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
396
  pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
397
  pContext->ahandle = pMsg->ahandle;
398
  pContext->pRpc = (SRpcInfo *)shandle;
399
  pContext->epSet = *pEpSet;
J
Jeff Tao 已提交
400
  pContext->contLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
401 402
  pContext->pCont = pMsg->pCont;
  pContext->msgType = pMsg->msgType;
403
  pContext->oldInUse = pEpSet->inUse;
404

dengyihao's avatar
dengyihao 已提交
405 406
  pContext->connType = RPC_CONN_UDPC;
  if (contLen > tsRpcMaxUdpSize || tsRpcForceTcp) pContext->connType = RPC_CONN_TCPC;
407

dengyihao's avatar
dengyihao 已提交
408
  // connection type is application specific.
409
  // for TDengine, all the query, show commands shall have TCP connection
410
  tmsg_t type = pMsg->msgType;
dengyihao's avatar
dengyihao 已提交
411 412 413
  if (type == TDMT_VND_QUERY || type == TDMT_MND_SHOW_RETRIEVE || type == TDMT_VND_FETCH ||
      type == TDMT_MND_VGROUP_LIST || type == TDMT_VND_TABLES_META || type == TDMT_VND_TABLE_META ||
      type == TDMT_MND_SHOW || type == TDMT_MND_STATUS || type == TDMT_VND_ALTER_TABLE)
414
    pContext->connType = RPC_CONN_TCPC;
张宏权 已提交
415

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
416
  pContext->rid = taosAddRef(tsRpcRefId, pContext);
417
  if (pRid) *pRid = pContext->rid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
418

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
419
  rpcSendReqToServer(pRpc, pContext);
420 421
}

J
Jeff Tao 已提交
422
void rpcSendResponse(const SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
423
  ASSERT(pRsp->handle != NULL);
424

dengyihao's avatar
dengyihao 已提交
425 426 427 428 429
  int       msgLen = 0;
  SRpcConn *pConn = (SRpcConn *)pRsp->handle;
  SRpcMsg   rpcMsg = *pRsp;
  SRpcMsg * pMsg = &rpcMsg;
  SRpcInfo *pRpc = pConn->pRpc;
J
Jeff Tao 已提交
430

dengyihao's avatar
dengyihao 已提交
431
  if (pMsg->pCont == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
432 433
    pMsg->pCont = rpcMallocCont(0);
    pMsg->contLen = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
434 435
  }

dengyihao's avatar
dengyihao 已提交
436 437
  SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont);
  char *    msg = (char *)pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
438

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
439 440
  pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
  msgLen = rpcMsgLenFromCont(pMsg->contLen);
441

442
  rpcLockConn(pConn);
443

dengyihao's avatar
dengyihao 已提交
444
  if (pConn->inType == 0 || pConn->user[0] == 0) {
H
Haojun Liao 已提交
445
    tError("%s, connection is already released, rsp wont be sent", pConn->info);
446
    rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
447 448
    rpcFreeCont(pMsg->pCont);
    rpcDecRef(pRpc);
449 450 451
    return;
  }

452
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
453
  pHead->version = 1;
dengyihao's avatar
dengyihao 已提交
454
  pHead->msgType = pConn->inType + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
455 456
  pHead->spi = pConn->spi;
  pHead->encrypt = pConn->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
457 458 459
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
460
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
461
  pHead->port = htons(pConn->localPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
462
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
463 464
  pHead->ahandle = (uint64_t)pConn->ahandle;

465 466
  // set pConn parameters
  pConn->inType = 0;
467 468

  // response message is released until new response is sent
dengyihao's avatar
dengyihao 已提交
469
  rpcFreeMsg(pConn->pRspMsg);
470 471
  pConn->pRspMsg = msg;
  pConn->rspMsgLen = msgLen;
472
  if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--;
473

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
474
  // stop the progress timer
475
  taosTmrStopA(&pConn->pTimer);
476 477

  // set the idle timer to monitor the activity
Y
yihaoDeng 已提交
478
  taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime * 30, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
479
  rpcSendMsgToPeer(pConn, msg, msgLen);
480 481

  // if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured
dengyihao's avatar
dengyihao 已提交
482
  if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY) pConn->secured = 1;  // connection shall be secured
483 484

  if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
485 486
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
487

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
488
  rpcUnlockConn(pConn);
dengyihao's avatar
dengyihao 已提交
489
  rpcDecRef(pRpc);  // decrease the referene count
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
490

491 492 493
  return;
}

S
Shengliang Guan 已提交
494
void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
495
  SRpcMsg rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
496
  memset(&rpcMsg, 0, sizeof(rpcMsg));
dengyihao's avatar
dengyihao 已提交
497

S
Shengliang Guan 已提交
498
  rpcMsg.contLen = sizeof(SEpSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
499 500
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  if (rpcMsg.pCont == NULL) return;
501

S
Shengliang Guan 已提交
502
  memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet));
503

504
  rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
505
  rpcMsg.handle = thandle;
506

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
507
  rpcSendResponse(&rpcMsg);
508 509 510 511

  return;
}

512
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
dengyihao's avatar
dengyihao 已提交
513
  SRpcConn *pConn = (SRpcConn *)thandle;
514
  if (pConn->user[0] == 0) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
515

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
516 517
  pInfo->clientIp = pConn->peerIp;
  pInfo->clientPort = pConn->peerPort;
J
jtao1735 已提交
518
  // pInfo->serverIp = pConn->destIp;
dengyihao's avatar
dengyihao 已提交
519

B
Bomin Zhang 已提交
520
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
521
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
522 523
}

S
Shengliang Guan 已提交
524
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
525
  SRpcReqContext *pContext;
dengyihao's avatar
dengyihao 已提交
526
  pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
527 528

  memset(pRsp, 0, sizeof(SRpcMsg));
dengyihao's avatar
dengyihao 已提交
529 530

  tsem_t sem;
531 532 533
  tsem_init(&sem, 0, 0);
  pContext->pSem = &sem;
  pContext->pRsp = pRsp;
534
  pContext->pSet = pEpSet;
535

536
  rpcSendRequest(shandle, pEpSet, pMsg, NULL);
537 538 539 540 541 542 543

  tsem_wait(&sem);
  tsem_destroy(&sem);

  return;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
544
// this API is used by server app to keep an APP context in case connection is broken
545
int rpcReportProgress(void *handle, char *pCont, int contLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
546
  SRpcConn *pConn = (SRpcConn *)handle;
dengyihao's avatar
dengyihao 已提交
547
  int       code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
548

549
  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
550

551 552
  if (pConn->user[0]) {
    // pReqMsg and reqMsgLen is re-used to store the context from app server
dengyihao's avatar
dengyihao 已提交
553
    pConn->pReqMsg = pCont;
554
    pConn->reqMsgLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
555
  } else {
556
    tDebug("%s, rpc connection is already released", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
557 558 559
    rpcFreeCont(pCont);
    code = -1;
  }
560

561
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
562
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
563 564
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
565 566 567
void rpcCancelRequest(int64_t rid) {
  SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid);
  if (pContext == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
568

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
569
  rpcCloseConn(pContext->pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
570

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
571
  taosReleaseRef(tsRpcRefId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
572 573
}

574
static void rpcFreeMsg(void *msg) {
dengyihao's avatar
dengyihao 已提交
575
  if (msg) {
576
    char *temp = (char *)msg - sizeof(SRpcReqContext);
wafwerar's avatar
wafwerar 已提交
577
    taosMemoryFree(temp);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
578
    tTrace("free mem: %p", temp);
579 580 581
  }
}

J
jtao1735 已提交
582
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
583
  SRpcConn *pConn;
S
slguan 已提交
584

S
Shengliang 已提交
585 586 587 588 589 590 591 592 593 594 595
  uint32_t  peerIp = 0;
  uint32_t *pPeerIp = taosHashGet(tsFqdnHash, peerFqdn, strlen(peerFqdn) + 1);
  if (pPeerIp != NULL) {
    peerIp = *pPeerIp;
  } else {
    peerIp = taosGetIpv4FromFqdn(peerFqdn);
    if (peerIp != 0xFFFFFFFF) {
      taosHashPut(tsFqdnHash, peerFqdn, strlen(peerFqdn) + 1, &peerIp, sizeof(peerIp));
    }
  }

596
  if (peerIp == 0xFFFFFFFF) {
dengyihao's avatar
dengyihao 已提交
597 598
    tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn);
    terrno = TSDB_CODE_RPC_FQDN_ERROR;
J
jtao1735 已提交
599 600 601
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
602
  pConn = rpcAllocateClientConn(pRpc);
H
hzcheng 已提交
603

dengyihao's avatar
dengyihao 已提交
604
  if (pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
605
    tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn));
J
jtao1735 已提交
606
    pConn->peerIp = peerIp;
607
    pConn->peerPort = peerPort;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
608
    tstrncpy(pConn->user, pRpc->user, sizeof(pConn->user));
609
    pConn->connType = connType;
610

611
    if (taosOpenConn[connType]) {
dengyihao's avatar
dengyihao 已提交
612
      void *shandle = (connType & RPC_CONN_TCP) ? pRpc->tcphandle : pRpc->udphandle;
J
jtao1735 已提交
613
      pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
614
      if (pConn->chandle == NULL) {
H
Haojun Liao 已提交
615
        tError("failed to connect to:%s:%d", taosIpStr(pConn->peerIp), pConn->peerPort);
616

617
        terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
618 619 620
        rpcCloseConn(pConn);
        pConn = NULL;
      }
H
hzcheng 已提交
621 622 623
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
624
  return pConn;
H
hzcheng 已提交
625 626
}

627
static void rpcReleaseConn(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
628
  SRpcInfo *pRpc = pConn->pRpc;
629
  if (pConn->user[0] == 0) return;
H
hzcheng 已提交
630

631 632
  pConn->user[0] = 0;
  if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle);
H
hzcheng 已提交
633

634 635 636
  taosTmrStopA(&pConn->pTimer);
  taosTmrStopA(&pConn->pIdleTimer);

dengyihao's avatar
dengyihao 已提交
637 638 639 640
  if (pRpc->connType == TAOS_CONN_SERVER) {
    char   hashstr[40] = {0};
    size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId,
                           pConn->connType);
J
jtao1735 已提交
641
    taosHashRemove(pRpc->hash, hashstr, size);
dengyihao's avatar
dengyihao 已提交
642
    rpcFreeMsg(pConn->pRspMsg);  // it may have a response msg saved, but not request msg
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
643
    pConn->pRspMsg = NULL;
dengyihao's avatar
dengyihao 已提交
644

645 646
    // if server has ever reported progress, free content
    if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);  // do not use rpcFreeMsg
647
  } else {
648
    // if there is an outgoing message, free it
陶建辉(Jeff)'s avatar
TD-1669  
陶建辉(Jeff) 已提交
649
    if (pConn->outType && pConn->pReqMsg) {
650
      SRpcReqContext *pContext = pConn->pContext;
Y
yihaoDeng 已提交
651
      if (pContext) {
dengyihao's avatar
dengyihao 已提交
652 653
        if (pContext->pRsp) {
          // for synchronous API, post semaphore to unblock app
Y
yihaoDeng 已提交
654 655 656 657 658
          pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR;
          pContext->pRsp->pCont = NULL;
          pContext->pRsp->contLen = 0;
          tsem_post(pContext->pSem);
        }
dengyihao's avatar
dengyihao 已提交
659
        pContext->pConn = NULL;
Y
yihaoDeng 已提交
660 661
        taosRemoveRef(tsRpcRefId, pContext->rid);
      } else {
dengyihao's avatar
dengyihao 已提交
662
        assert(0);
663
      }
陶建辉(Jeff)'s avatar
TD-1669  
陶建辉(Jeff) 已提交
664
    }
665
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
666

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
667 668 669 670 671 672
  // memset could not be used, since lockeBy can not be reset
  pConn->inType = 0;
  pConn->outType = 0;
  pConn->inTranId = 0;
  pConn->outTranId = 0;
  pConn->secured = 0;
673
  pConn->peerId = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
674 675
  pConn->peerIp = 0;
  pConn->peerPort = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
676 677 678
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
  pConn->pContext = NULL;
679
  pConn->chandle = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
680 681

  taosFreeId(pRpc->idPool, pConn->sid);
682
  tDebug("%s, rpc connection is released", pConn->info);
683 684 685 686
}

static void rpcCloseConn(void *thandle) {
  SRpcConn *pConn = (SRpcConn *)thandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
687
  if (pConn == NULL) return;
688 689 690

  rpcLockConn(pConn);

dengyihao's avatar
dengyihao 已提交
691
  if (pConn->user[0]) rpcReleaseConn(pConn);
692

693
  rpcUnlockConn(pConn);
H
hzcheng 已提交
694 695
}

696 697
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
698

699 700 701
  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
702
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
703
  } else {
704 705 706 707
    pConn = pRpc->connList + sid;

    pConn->pRpc = pRpc;
    pConn->sid = sid;
708
    pConn->tranId = (uint16_t)(taosRand() & 0xFFFF);
709
    pConn->ownId = htonl(pConn->sid);
710
    pConn->linkUid = (uint32_t)((int64_t)pConn + taosGetPId() + (int64_t)pConn->tranId);
711 712
    pConn->spi = pRpc->spi;
    pConn->encrypt = pRpc->encrypt;
713
    if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_PASSWORD_LEN);
714
    tDebug("%s %p client connection is allocated, uid:0x%x", pRpc->label, pConn, pConn->linkUid);
715
  }
H
hzcheng 已提交
716

717 718
  return pConn;
}
H
hzcheng 已提交
719

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
720
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
721
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
722
  char      hashstr[40] = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
723 724
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;

dengyihao's avatar
dengyihao 已提交
725 726 727
  size_t size =
      snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);

728
  // check if it is already allocated
J
jtao1735 已提交
729
  SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
730
  if (ppConn) pConn = *ppConn;
731 732 733 734
  if (pConn) {
    pConn->secured = 0;
    return pConn;
  }
735

736 737 738 739 740 741
  // if code is not 0, it means it is simple reqhead, just ignore
  if (pHead->code != 0) {
    terrno = TSDB_CODE_RPC_ALREADY_PROCESSED;
    return NULL;
  }

742 743 744
  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
745
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
746 747
  } else {
    pConn = pRpc->connList + sid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
748
    memcpy(pConn->user, pHead->user, tListLen(pConn->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
749
    pConn->pRpc = pRpc;
H
hzcheng 已提交
750
    pConn->sid = sid;
wafwerar's avatar
wafwerar 已提交
751
    pConn->tranId = (uint16_t)(taosRand() & 0xFFFF);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
752
    pConn->ownId = htonl(pConn->sid);
753
    pConn->linkUid = pHead->linkUid;
754
    if (pRpc->afp) {
755
      if (pConn->user[0] == 0) {
756
        terrno = TSDB_CODE_RPC_AUTH_REQUIRED;
757
      } else {
S
Shengliang Guan 已提交
758
        terrno = (*pRpc->afp)(pRpc->parent, pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
759 760
      }

761
      if (terrno != 0) {
S
Shengliang Guan 已提交
762
        taosFreeId(pRpc->idPool, sid);  // sid shall be released
763 764
        pConn = NULL;
      }
H
hzcheng 已提交
765
    }
S
Shengliang Guan 已提交
766
  }
H
hzcheng 已提交
767

768
  if (pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
769 770
    if (pRecv->connType == RPC_CONN_UDPS && pRpc->numOfThreads > 1) {
      // UDP server, assign to new connection
S
Shengliang Guan 已提交
771
      pRpc->index = (pRpc->index + 1) % pRpc->numOfThreads;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
772 773
      pConn->localPort = (pRpc->localPort + pRpc->index);
    }
S
Shengliang Guan 已提交
774

J
jtao1735 已提交
775
    taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
776 777
    tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s spi:%d", pRpc->label, pConn, pConn->linkUid,
           sid, hashstr, pConn->spi);
778 779 780 781 782
  }

  return pConn;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
783
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
dengyihao's avatar
dengyihao 已提交
784
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
785
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
786 787 788

  if (sid) {
    pConn = pRpc->connList + sid;
789
    if (pConn->user[0] == 0) pConn = NULL;
dengyihao's avatar
dengyihao 已提交
790
  }
791

dengyihao's avatar
dengyihao 已提交
792
  if (pConn == NULL) {
793 794 795
    if (pRpc->connType == TAOS_CONN_SERVER) {
      pConn = rpcAllocateServerConn(pRpc, pRecv);
    } else {
796
      terrno = TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
797 798
    }
  }
799

800
  if (pConn) {
801
    if (pConn->linkUid != pHead->linkUid) {
802
      terrno = TSDB_CODE_RPC_MISMATCHED_LINK_ID;
S
Shengliang Guan 已提交
803 804
      tDebug("%s %p %p, linkUid:0x%x is not matched with received:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
             pConn->linkUid, pHead->linkUid);
805
      pConn = NULL;
H
hzcheng 已提交
806 807 808
    }
  }

809
  return pConn;
H
hzcheng 已提交
810 811
}

812
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
S
Shengliang Guan 已提交
813 814
  SRpcConn *pConn;
  SRpcInfo *pRpc = pContext->pRpc;
dengyihao's avatar
dengyihao 已提交
815
  SEpSet *  pEpSet = &pContext->epSet;
H
hzcheng 已提交
816

dengyihao's avatar
dengyihao 已提交
817 818
  pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port,
                              pContext->connType);
dengyihao's avatar
dengyihao 已提交
819
  if (pConn == NULL || pConn->user[0] == 0) {
H
Haojun Liao 已提交
820
    pConn = rpcOpenConn(pRpc, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType);
dengyihao's avatar
dengyihao 已提交
821
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
822 823

  if (pConn) {
S
Shengliang Guan 已提交
824
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
825
    pConn->ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
826
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
827
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
828
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
829
    tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
830
  }
H
hzcheng 已提交
831

832
  return pConn;
H
hzcheng 已提交
833 834
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
835
static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
dengyihao's avatar
dengyihao 已提交
836 837 838 839 840 841
  if (pConn->peerId == 0) {
    pConn->peerId = pHead->sourceId;
  } else {
    if (pConn->peerId != pHead->sourceId) {
      tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, pConn->peerId, pHead->sourceId);
      return TSDB_CODE_RPC_INVALID_VALUE;
H
hzcheng 已提交
842
    }
dengyihao's avatar
dengyihao 已提交
843
  }
H
hzcheng 已提交
844

dengyihao's avatar
dengyihao 已提交
845 846 847 848 849
  if (pConn->inTranId == pHead->tranId) {
    if (pConn->inType == pHead->msgType) {
      if (pHead->code == 0) {
        tDebug("%s, %s is retransmitted", pConn->info, TMSG_INFO(pHead->msgType));
        rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
850
      } else {
dengyihao's avatar
dengyihao 已提交
851
        // do nothing, it is heart beat from client
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
852
      }
dengyihao's avatar
dengyihao 已提交
853 854 855 856 857
    } else if (pConn->inType == 0) {
      tDebug("%s, %s is already processed, tranId:%d", pConn->info, TMSG_INFO(pHead->msgType), pConn->inTranId);
      rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen);  // resend the response
    } else {
      tDebug("%s, mismatched message %s and tranId", pConn->info, TMSG_INFO(pHead->msgType));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
858
    }
H
hzcheng 已提交
859

dengyihao's avatar
dengyihao 已提交
860 861 862
    // do not reply any message
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
  }
H
hzcheng 已提交
863

dengyihao's avatar
dengyihao 已提交
864 865 866 867
  if (pConn->inType != 0) {
    tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, pConn->inTranId, pHead->tranId);
    return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
  }
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
868

dengyihao's avatar
dengyihao 已提交
869 870 871 872
  if (rpcContLenFromMsg(pHead->msgLen) <= 0) {
    tDebug("%s, message body is empty, ignore", pConn->info);
    return TSDB_CODE_RPC_APP_ERROR;
  }
H
hzcheng 已提交
873

dengyihao's avatar
dengyihao 已提交
874 875 876 877 878 879 880 881
  pConn->inTranId = pHead->tranId;
  pConn->inType = pHead->msgType;

  // start the progress timer to monitor the response from server app
  if (pConn->connType != RPC_CONN_TCPS)
    pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl);

  return 0;
H
hzcheng 已提交
882 883
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
884
static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
885
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
886
  pConn->peerId = pHead->sourceId;
H
hzcheng 已提交
887

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
888
  if (pConn->outType == 0 || pConn->pContext == NULL) {
889
    return TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
890
  }
H
hzcheng 已提交
891

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
892
  if (pHead->tranId != pConn->outTranId) {
893
    return TSDB_CODE_RPC_INVALID_TRAN_ID;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
894
  }
H
hzcheng 已提交
895

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
896
  if (pHead->msgType != pConn->outType + 1) {
897
    return TSDB_CODE_RPC_INVALID_RESPONSE_TYPE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
898
  }
H
hzcheng 已提交
899

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
900 901 902
  taosTmrStopA(&pConn->pTimer);
  pConn->retry = 0;

903
  if (pHead->code == TSDB_CODE_RPC_AUTH_REQUIRED && pRpc->spi) {
904
    tDebug("%s, authentication shall be restarted", pConn->info);
905
    pConn->secured = 0;
dengyihao's avatar
dengyihao 已提交
906
    rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
907 908
    if (pConn->connType != RPC_CONN_TCPC)
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
909
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
910 911
  }

912 913 914 915
  if (pHead->code == TSDB_CODE_RPC_MISMATCHED_LINK_ID) {
    tDebug("%s, mismatched linkUid, link shall be restarted", pConn->info);
    pConn->secured = 0;
    ((SRpcHead *)pConn->pReqMsg)->destId = 0;
dengyihao's avatar
dengyihao 已提交
916
    rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
917 918 919 920 921
    if (pConn->connType != RPC_CONN_TCPC)
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
  }

922
  if (pHead->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
923
    if (pConn->tretry <= tsRpcMaxRetry) {
924
      tDebug("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
925 926
      pConn->tretry++;
      rpcSendReqHead(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
927 928
      if (pConn->connType != RPC_CONN_TCPC)
        pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
929
      return TSDB_CODE_RPC_ALREADY_PROCESSED;
H
hzcheng 已提交
930
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
931
      // peer still in processing, give up
932
      tDebug("%s, server processing takes too long time, give up", pConn->info);
933
      pHead->code = TSDB_CODE_RPC_TOO_SLOW;
H
hzcheng 已提交
934 935
    }
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
936

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
937 938
  pConn->outType = 0;
  pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
939
  pConn->reqMsgLen = 0;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
940 941
  SRpcReqContext *pContext = pConn->pContext;

dengyihao's avatar
dengyihao 已提交
942
  if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
S
Shengliang Guan 已提交
943
    if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SEpSet)) {
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
944
      // if EpSet is not included in the msg, treat it as NOT_READY
dengyihao's avatar
dengyihao 已提交
945
      pHead->code = TSDB_CODE_RPC_NOT_READY;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
946 947 948
    } else {
      pContext->redirect++;
      if (pContext->redirect > TSDB_MAX_REPLICA) {
dengyihao's avatar
dengyihao 已提交
949
        pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
950 951 952
        tWarn("%s, too many redirects, quit", pConn->info);
      }
    }
dengyihao's avatar
dengyihao 已提交
953
  }
954 955

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
956 957
}

S
slguan 已提交
958
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) {
dengyihao's avatar
dengyihao 已提交
959 960
  int32_t   sid;
  SRpcConn *pConn = NULL;
H
hzcheng 已提交
961

962
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
963

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
964
  sid = htonl(pHead->destId);
S
slguan 已提交
965
  *ppContext = NULL;
H
hzcheng 已提交
966

967
  if (TMSG_INDEX(pHead->msgType) >= TDMT_MAX || TMSG_INDEX(pHead->msgType) <= 0) {
968
    tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
dengyihao's avatar
dengyihao 已提交
969 970
    terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE;
    return NULL;
H
hzcheng 已提交
971 972
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
973
  if (sid < 0 || sid >= pRpc->sessions) {
dengyihao's avatar
dengyihao 已提交
974 975 976 977
    tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, pRpc->sessions,
           TMSG_INFO(pHead->msgType));
    terrno = TSDB_CODE_RPC_INVALID_SESSION_ID;
    return NULL;
H
hzcheng 已提交
978 979
  }

980
  if (rpcIsReq(pHead->msgType) && htonl(pHead->msgVer) != tsVersion >> 8) {
dengyihao's avatar
dengyihao 已提交
981 982 983 984
    tDebug("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion,
           TMSG_INFO(pHead->msgType));
    terrno = TSDB_CODE_RPC_INVALID_VERSION;
    return NULL;
985 986
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
987
  pConn = rpcGetConnObj(pRpc, sid, pRecv);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
988
  if (pConn == NULL) {
dengyihao's avatar
dengyihao 已提交
989
    tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
990
    return NULL;
dengyihao's avatar
dengyihao 已提交
991
  }
H
hzcheng 已提交
992

993
  rpcLockConn(pConn);
H
hzcheng 已提交
994

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
995 996
  if (rpcIsReq(pHead->msgType)) {
    pConn->ahandle = (void *)pHead->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
997
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
998 999 1000
  }

  sid = pConn->sid;
1001
  if (pConn->chandle == NULL) pConn->chandle = pRecv->chandle;
dengyihao's avatar
dengyihao 已提交
1002
  pConn->peerIp = pRecv->ip;
1003
  pConn->peerPort = pRecv->port;
dengyihao's avatar
dengyihao 已提交
1004
  if (pHead->port) pConn->peerPort = htons(pHead->port);
H
hzcheng 已提交
1005

1006
  terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1007 1008 1009 1010

  // code can be transformed only after authentication
  pHead->code = htonl(pHead->code);

1011
  if (terrno == 0) {
J
jtao1735 已提交
1012
    if (pHead->encrypt) {
1013 1014
      // decrypt here
    }
H
hzcheng 已提交
1015

dengyihao's avatar
dengyihao 已提交
1016
    if (rpcIsReq(pHead->msgType)) {
1017
      pConn->connType = pRecv->connType;
Y
yihaoDeng 已提交
1018
      terrno = rpcProcessReqHead(pConn, pHead);
1019

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1020
      // stop idle timer
dengyihao's avatar
dengyihao 已提交
1021
      taosTmrStopA(&pConn->pIdleTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1022

dengyihao's avatar
dengyihao 已提交
1023
      // client shall send the request within tsRpcTime again for UDP, double it
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1024
      if (pConn->connType != RPC_CONN_TCPS)
dengyihao's avatar
dengyihao 已提交
1025
        pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer * 2, pConn, pRpc->tmrCtrl);
1026 1027
    } else {
      terrno = rpcProcessRspHead(pConn, pHead);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1028
      *ppContext = pConn->pContext;
1029
    }
H
hzcheng 已提交
1030 1031
  }

1032
  rpcUnlockConn(pConn);
H
hzcheng 已提交
1033

1034
  return pConn;
H
hzcheng 已提交
1035 1036
}

Y
TD-3409  
yihaoDeng 已提交
1037
static void doRpcReportBrokenLinkToServer(void *param, void *id) {
dengyihao's avatar
dengyihao 已提交
1038 1039 1040
  SRpcMsg * pRpcMsg = (SRpcMsg *)(param);
  SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle);
  SRpcInfo *pRpc = pConn->pRpc;
S
Shengliang Guan 已提交
1041
  (*(pRpc->cfp))(pRpc->parent, pRpcMsg, NULL);
wafwerar's avatar
wafwerar 已提交
1042
  taosMemoryFree(pRpcMsg);
Y
TD-3409  
yihaoDeng 已提交
1043
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1044 1045
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1046
  if (pConn->pReqMsg == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1047 1048

  // if there are pending request, notify the app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1049
  rpcAddRef(pRpc);
1050
  tDebug("%s, notify the server app, connection is gone", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1051

wafwerar's avatar
wafwerar 已提交
1052
  SRpcMsg *rpcMsg = taosMemoryMalloc(sizeof(SRpcMsg));
dengyihao's avatar
dengyihao 已提交
1053 1054
  rpcMsg->pCont = pConn->pReqMsg;      // pReqMsg is re-used to store the APP context from server
  rpcMsg->contLen = pConn->reqMsgLen;  // reqMsgLen is re-used to store the APP context length
Y
TD-3409  
yihaoDeng 已提交
1055 1056 1057
  rpcMsg->ahandle = pConn->ahandle;
  rpcMsg->handle = pConn;
  rpcMsg->msgType = pConn->inType;
dengyihao's avatar
dengyihao 已提交
1058
  rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1059 1060
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
Y
TD-3409  
yihaoDeng 已提交
1061 1062 1063
  if (pRpc->cfp) {
    taosTmrStart(doRpcReportBrokenLinkToServer, 0, rpcMsg, pRpc->tmrCtrl);
  } else {
wafwerar's avatar
wafwerar 已提交
1064
    taosMemoryFree(rpcMsg);
Y
TD-3409  
yihaoDeng 已提交
1065
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1066 1067
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1068
static void rpcProcessBrokenLink(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1069
  if (pConn == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1070
  SRpcInfo *pRpc = pConn->pRpc;
1071
  tDebug("%s, link is broken", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1072 1073

  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1074

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1075 1076
  if (pConn->outType) {
    SRpcReqContext *pContext = pConn->pContext;
1077
    pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1078
    pContext->pConn = NULL;
1079
    pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1080 1081
    taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
  }
dengyihao's avatar
dengyihao 已提交
1082 1083

  if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1084

1085
  rpcReleaseConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1086
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1087 1088
}

1089
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
dengyihao's avatar
dengyihao 已提交
1090 1091 1092
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
  SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle;
  SRpcConn *pConn = (SRpcConn *)pRecv->thandle;
H
hzcheng 已提交
1093

S
Shengliang Guan 已提交
1094
  tDump(pRecv->msg, pRecv->msgLen);
1095 1096

  // underlying UDP layer does not know it is server or client
dengyihao's avatar
dengyihao 已提交
1097
  pRecv->connType = pRecv->connType | pRpc->connType;
H
hzcheng 已提交
1098

B
Bomin Zhang 已提交
1099
  if (pRecv->msg == NULL) {
1100
    rpcProcessBrokenLink(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1101 1102 1103
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1104
  terrno = 0;
S
slguan 已提交
1105 1106
  SRpcReqContext *pContext;
  pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
H
hzcheng 已提交
1107

1108 1109 1110
  char ipstr[24] = {0};
  taosIpPort2String(pRecv->ip, pRecv->port, ipstr);

1111
  if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) {
dengyihao's avatar
dengyihao 已提交
1112 1113 1114
    tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn,
           (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen, pHead->sourceId,
           pHead->destId, pHead->tranId, pHead->code);
1115
  } else {
dengyihao's avatar
dengyihao 已提交
1116 1117 1118
    tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn,
           (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId,
           pHead->tranId, pHead->code);
1119
  }
H
hzcheng 已提交
1120

H
TD-34  
hzcheng 已提交
1121
  int32_t code = terrno;
1122
  if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) {
dengyihao's avatar
dengyihao 已提交
1123
    if (code != 0) {  // parsing error
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1124
      if (rpcIsReq(pHead->msgType)) {
H
TD-34  
hzcheng 已提交
1125
        rpcSendErrorMsgToPeer(pRecv, code);
B
Bomin Zhang 已提交
1126 1127 1128
        if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE) {
          rpcCloseConn(pConn);
        }
1129
        if (TMSG_INDEX(pHead->msgType) + 1 > 1 && TMSG_INDEX(pHead->msgType) + 1 < TDMT_MAX) {
dengyihao's avatar
dengyihao 已提交
1130 1131
          tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
                 TMSG_INFO(pHead->msgType + 1), code);
Y
yihaoDeng 已提交
1132
        } else {
dengyihao's avatar
dengyihao 已提交
1133 1134
          tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
                 TMSG_INFO(pHead->msgType), code);
1135
        }
dengyihao's avatar
dengyihao 已提交
1136 1137
      }
    } else {  // msg is passed to app only parsing is ok
S
slguan 已提交
1138
      rpcProcessIncomingMsg(pConn, pHead, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1139
    }
H
hzcheng 已提交
1140 1141
  }

dengyihao's avatar
dengyihao 已提交
1142
  if (code) rpcFreeMsg(pRecv->msg);  // parsing failed, msg shall be freed
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1143 1144
  return pConn;
}
H
hzcheng 已提交
1145

1146
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
dengyihao's avatar
dengyihao 已提交
1147
  SRpcInfo *pRpc = pContext->pRpc;
1148

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1149
  pContext->pConn = NULL;
dengyihao's avatar
dengyihao 已提交
1150
  if (pContext->pRsp) {
1151
    // for synchronous API
S
Shengliang Guan 已提交
1152
    memcpy(pContext->pSet, &pContext->epSet, sizeof(SEpSet));
1153
    memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1154
    tsem_post(pContext->pSem);
1155
  } else {
dengyihao's avatar
dengyihao 已提交
1156
    // for asynchronous API
S
Shengliang Guan 已提交
1157
    SEpSet *pEpSet = NULL;
dengyihao's avatar
dengyihao 已提交
1158
    if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) pEpSet = &pContext->epSet;
1159

S
Shengliang Guan 已提交
1160
    (*pRpc->cfp)(pRpc->parent, pMsg, pEpSet);
1161 1162 1163
  }

  // free the request message
dengyihao's avatar
dengyihao 已提交
1164
  taosRemoveRef(tsRpcRefId, pContext->rid);
1165 1166
}

S
slguan 已提交
1167
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1168
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1169
  SRpcMsg   rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1170

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1171
  pHead = rpcDecompressRpcMsg(pHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1172 1173 1174
  rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
  rpcMsg.pCont = pHead->content;
  rpcMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
1175 1176 1177
  rpcMsg.code = pHead->code;

  if (rpcIsReq(pHead->msgType)) {
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1178
    rpcMsg.ahandle = pConn->ahandle;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1179 1180
    rpcMsg.handle = pConn;
    rpcAddRef(pRpc);  // add the refCount for requests
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1181

陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1182
    // notify the server app
S
Shengliang Guan 已提交
1183
    (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
H
hzcheng 已提交
1184
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1185
    // it's a response
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1186
    rpcMsg.handle = pContext;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1187
    rpcMsg.ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1188
    pContext->pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1189

1190
    // for UDP, port may be changed by server, the port in epSet shall be used for cache
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1191
    if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
H
Haojun Liao 已提交
1192
      rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.eps[pContext->epSet.inUse].port,
dengyihao's avatar
dengyihao 已提交
1193
                          pConn->connType);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1194
    } else {
1195 1196
      rpcCloseConn(pConn);
    }
H
hzcheng 已提交
1197

1198
    if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1199
      pContext->numOfTry = 0;
dengyihao's avatar
dengyihao 已提交
1200
      SEpSet *pEpSet = (SEpSet *)pHead->content;
1201 1202
      if (pEpSet->numOfEps > 0) {
        memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
S
TD-1670  
Shengliang Guan 已提交
1203 1204 1205
        tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,
               pContext->epSet.inUse);
        for (int i = 0; i < pContext->epSet.numOfEps; ++i) {
H
Haojun Liao 已提交
1206 1207 1208
          pContext->epSet.eps[i].port = htons(pContext->epSet.eps[i].port);
          tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.eps[i].fqdn,
                 pContext->epSet.eps[i].port);
S
TD-1670  
Shengliang Guan 已提交
1209
        }
1210
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1211
      rpcSendReqToServer(pRpc, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1212
      rpcFreeCont(rpcMsg.pCont);
dengyihao's avatar
dengyihao 已提交
1213
    } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY ||
1214
               pHead->code == TSDB_CODE_NODE_OFFLINE) {
1215 1216
      pContext->code = pHead->code;
      rpcProcessConnError(pContext, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1217
      rpcFreeCont(rpcMsg.pCont);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1218
    } else {
1219
      rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1220 1221 1222 1223
    }
  }
}

1224
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
dengyihao's avatar
dengyihao 已提交
1225 1226
  char      msg[RPC_MSG_OVERHEAD];
  SRpcHead *pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1227

1228
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1229 1230 1231
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
dengyihao's avatar
dengyihao 已提交
1232
  pHead->msgType = pConn->inType + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1233
  pHead->spi = pConn->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1234 1235 1236 1237
  pHead->encrypt = 0;
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
1238
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1239
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1240
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1241
  pHead->code = htonl(code);
H
hzcheng 已提交
1242

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1243
  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
dengyihao's avatar
dengyihao 已提交
1244
  pConn->secured = 1;  // connection shall be secured
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1245 1246 1247
}

static void rpcSendReqHead(SRpcConn *pConn) {
dengyihao's avatar
dengyihao 已提交
1248 1249
  char      msg[RPC_MSG_OVERHEAD];
  SRpcHead *pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1250 1251 1252 1253 1254 1255

  // set msg header
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
  pHead->msgType = pConn->outType;
1256
  pHead->msgVer = htonl(tsVersion >> 8);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1257 1258 1259 1260 1261 1262
  pHead->spi = pConn->spi;
  pHead->encrypt = 0;
  pHead->tranId = pConn->outTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1263
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1264 1265 1266 1267
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
  pHead->code = 1;

  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1268
}
H
hjxilinx 已提交
1269

1270
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
dengyihao's avatar
dengyihao 已提交
1271 1272 1273 1274
  SRpcHead *pRecvHead, *pReplyHead;
  char      msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t)];
  uint32_t  timeStamp;
  int       msgLen;
H
hzcheng 已提交
1275

1276
  pRecvHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1277
  pReplyHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1278

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1279 1280
  memset(msg, 0, sizeof(SRpcHead));
  pReplyHead->version = pRecvHead->version;
1281
  pReplyHead->msgType = (tmsg_t)(pRecvHead->msgType + 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1282
  pReplyHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1283
  pReplyHead->encrypt = pRecvHead->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1284
  pReplyHead->tranId = pRecvHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1285
  pReplyHead->sourceId = pRecvHead->destId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1286
  pReplyHead->destId = pRecvHead->sourceId;
1287
  pReplyHead->linkUid = pRecvHead->linkUid;
1288
  pReplyHead->ahandle = pRecvHead->ahandle;
H
hzcheng 已提交
1289

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1290 1291
  pReplyHead->code = htonl(code);
  msgLen = sizeof(SRpcHead);
H
hzcheng 已提交
1292

1293
  if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP) {
1294
    // include a time stamp if client's time is not synchronized well
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1295
    uint8_t *pContent = pReplyHead->content;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1296
    timeStamp = htonl(taosGetTimestampSec());
1297 1298
    memcpy(pContent, &timeStamp, sizeof(timeStamp));
    msgLen += sizeof(timeStamp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1299
  }
H
hzcheng 已提交
1300

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1301
  pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1302
  (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle);
H
hzcheng 已提交
1303

dengyihao's avatar
dengyihao 已提交
1304
  return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1305 1306
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1307
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
dengyihao's avatar
dengyihao 已提交
1308 1309 1310 1311
  SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
  char *    msg = (char *)pHead;
  int       msgLen = rpcMsgLenFromCont(pContext->contLen);
  tmsg_t    msgType = pContext->msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1312

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1313
  pContext->numOfTry++;
1314
  SRpcConn *pConn = rpcSetupConnToServer(pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1315 1316
  if (pConn == NULL) {
    pContext->code = terrno;
1317
    taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1318 1319 1320
    return;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1321
  pContext->pConn = pConn;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1322
  pConn->ahandle = pContext->ahandle;
1323
  rpcLockConn(pConn);
1324

dengyihao's avatar
dengyihao 已提交
1325
  // set the message header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1326
  pHead->version = 1;
1327
  pHead->msgVer = htonl(tsVersion >> 8);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1328 1329
  pHead->msgType = msgType;
  pHead->encrypt = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1330
  pConn->tranId++;
dengyihao's avatar
dengyihao 已提交
1331
  if (pConn->tranId == 0) pConn->tranId++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1332 1333 1334 1335
  pHead->tranId = pConn->tranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->port = 0;
1336
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1337
  pHead->ahandle = (uint64_t)pConn->ahandle;
1338
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1339 1340 1341

  // set the connection parameters
  pConn->outType = msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1342
  pConn->outTranId = pHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1343 1344
  pConn->pReqMsg = msg;
  pConn->reqMsgLen = msgLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1345
  pConn->pContext = pContext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1346

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1347
  rpcSendMsgToPeer(pConn, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1348 1349
  if (pConn->connType != RPC_CONN_TCPC)
    taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1350 1351

  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1352
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1353 1354

static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
dengyihao's avatar
dengyihao 已提交
1355 1356
  int       writtenLen = 0;
  SRpcHead *pHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1357

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1358
  msgLen = rpcAddAuthPart(pConn, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1359

dengyihao's avatar
dengyihao 已提交
1360 1361 1362
  if (rpcIsReq(pHead->msgType)) {
    tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType),
           pConn->peerFqdn, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1363
  } else {
H
Haojun Liao 已提交
1364 1365 1366 1367 1368 1369 1370 1371
    if (pHead->code == 0) {
      pConn->secured = 1;  // for success response, set link as secured
    }

    char ipport[40] = {0};
    taosIpPort2String(pConn->peerIp, pConn->peerPort, ipport);
    tDebug("%s, %s is sent to %s, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType),
           ipport, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1372
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1373

dengyihao's avatar
dengyihao 已提交
1374
  // tTrace("connection type is: %d", pConn->connType);
1375
  writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1376

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1377
  if (writtenLen != msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1378
    tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
1379
  }
dengyihao's avatar
dengyihao 已提交
1380

S
Shengliang Guan 已提交
1381
  tDump(msg, msgLen);
H
hzcheng 已提交
1382 1383
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1384
static void rpcProcessConnError(void *param, void *id) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1385
  SRpcReqContext *pContext = (SRpcReqContext *)param;
dengyihao's avatar
dengyihao 已提交
1386
  SRpcInfo *      pRpc = pContext->pRpc;
S
Shengliang Guan 已提交
1387
  SRpcMsg         rpcMsg = {0};
dengyihao's avatar
dengyihao 已提交
1388

H
hjxilinx 已提交
1389 1390 1391
  if (pRpc == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1392

1393
  tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);
H
hzcheng 已提交
1394

H
Hongze Cheng 已提交
1395
  if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TDMT_VND_FETCH) {
dengyihao's avatar
dengyihao 已提交
1396
    rpcMsg.msgType = pContext->msgType + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1397
    rpcMsg.ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1398 1399 1400
    rpcMsg.code = pContext->code;
    rpcMsg.pCont = NULL;
    rpcMsg.contLen = 0;
1401 1402

    rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1403
  } else {
dengyihao's avatar
dengyihao 已提交
1404
    // move to next IP
1405 1406
    pContext->epSet.inUse++;
    pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1407
    rpcSendReqToServer(pRpc, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1408
  }
H
hzcheng 已提交
1409 1410
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1411 1412 1413
static void rpcProcessRetryTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;
H
hzcheng 已提交
1414

1415
  rpcLockConn(pConn);
H
hzcheng 已提交
1416

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1417
  if (pConn->outType && pConn->user[0]) {
H
Hongze Cheng 已提交
1418
    tDebug("%s, expected %s is not received", pConn->info, TMSG_INFO((int)pConn->outType + 1));
H
hzcheng 已提交
1419 1420 1421
    pConn->pTimer = NULL;
    pConn->retry++;

S
slguan 已提交
1422
    if (pConn->retry < 4) {
H
Hongze Cheng 已提交
1423
      tDebug("%s, re-send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn, pConn->peerPort);
dengyihao's avatar
dengyihao 已提交
1424
      rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
1425
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
H
hzcheng 已提交
1426 1427
    } else {
      // close the connection
dengyihao's avatar
dengyihao 已提交
1428 1429
      tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn,
             pConn->peerPort);
1430 1431
      if (pConn->pContext) {
        pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1432
        pConn->pContext->pConn = NULL;
1433
        pConn->pReqMsg = NULL;
1434
        taosTmrStart(rpcProcessConnError, 1, pConn->pContext, pRpc->tmrCtrl);
1435 1436
        rpcReleaseConn(pConn);
      }
H
hzcheng 已提交
1437
    }
1438
  } else {
1439
    tDebug("%s, retry timer not processed", pConn->info);
H
hzcheng 已提交
1440 1441
  }

1442
  rpcUnlockConn(pConn);
H
hzcheng 已提交
1443 1444
}

1445 1446 1447
static void rpcProcessIdleTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;

1448 1449
  rpcLockConn(pConn);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1450
  if (pConn->user[0]) {
1451
    tDebug("%s, close the connection since no activity", pConn->info);
dengyihao's avatar
dengyihao 已提交
1452
    if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
1453
    rpcReleaseConn(pConn);
1454
  } else {
1455
    tDebug("%s, idle timer:%p not processed", pConn->info, tmrId);
1456
  }
1457 1458

  rpcUnlockConn(pConn);
1459 1460 1461 1462 1463 1464
}

static void rpcProcessProgressTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;

1465
  rpcLockConn(pConn);
1466

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1467
  if (pConn->inType && pConn->user[0]) {
1468
    tDebug("%s, progress timer expired, send progress", pConn->info);
1469
    rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
1470
    pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
1471
  } else {
1472
    tDebug("%s, progress timer:%p not processed", pConn->info, tmrId);
1473 1474
  }

1475
  rpcUnlockConn(pConn);
1476 1477
}

dengyihao's avatar
dengyihao 已提交
1478 1479 1480 1481 1482
static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen) {
  SRpcHead *pHead = rpcHeadFromCont(pCont);
  int32_t   finalLen = 0;
  int       overhead = sizeof(SRpcComp);

1483 1484 1485
  if (!NEEDTO_COMPRESSS_MSG(contLen)) {
    return contLen;
  }
dengyihao's avatar
dengyihao 已提交
1486

wafwerar's avatar
wafwerar 已提交
1487
  char *buf = taosMemoryMalloc(contLen + overhead + 8);  // 8 extra bytes
1488
  if (buf == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1489
    tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
1490 1491
    return contLen;
  }
dengyihao's avatar
dengyihao 已提交
1492

1493
  int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
S
Shuduo Sang 已提交
1494
  tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead);
dengyihao's avatar
dengyihao 已提交
1495

1496 1497 1498 1499
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
S
TD-4100  
Shengliang Guan 已提交
1500
  if (compLen > 0 && compLen < contLen - overhead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1501
    SRpcComp *pComp = (SRpcComp *)pCont;
dengyihao's avatar
dengyihao 已提交
1502 1503
    pComp->reserved = 0;
    pComp->contLen = htonl(contLen);
1504
    memcpy(pCont + overhead, buf, compLen);
dengyihao's avatar
dengyihao 已提交
1505

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1506
    pHead->comp = 1;
S
Shuduo Sang 已提交
1507
    tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen);
1508 1509 1510 1511 1512
    finalLen = compLen + overhead;
  } else {
    finalLen = contLen;
  }

wafwerar's avatar
wafwerar 已提交
1513
  taosMemoryFree(buf);
1514 1515 1516
  return finalLen;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1517
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
dengyihao's avatar
dengyihao 已提交
1518 1519 1520 1521
  int       overhead = sizeof(SRpcComp);
  SRpcHead *pNewHead = NULL;
  uint8_t * pCont = pHead->content;
  SRpcComp *pComp = (SRpcComp *)pHead->content;
1522

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1523
  if (pHead->comp) {
1524
    // decompress the content
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1525 1526
    assert(pComp->reserved == 0);
    int contLen = htonl(pComp->contLen);
dengyihao's avatar
dengyihao 已提交
1527

1528
    // prepare the temporary buffer to decompress message
wafwerar's avatar
wafwerar 已提交
1529
    char *temp = (char *)taosMemoryMalloc(contLen + RPC_MSG_OVERHEAD);
dengyihao's avatar
dengyihao 已提交
1530 1531
    pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext));  // reserve SRpcReqContext

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1532
    if (pNewHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1533
      int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
dengyihao's avatar
dengyihao 已提交
1534
      int origLen = LZ4_decompress_safe((char *)(pCont + overhead), (char *)pNewHead->content, compLen, contLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1535
      assert(origLen == contLen);
dengyihao's avatar
dengyihao 已提交
1536

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1537
      memcpy(pNewHead, pHead, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1538
      pNewHead->msgLen = rpcMsgLenFromCont(origLen);
dengyihao's avatar
dengyihao 已提交
1539 1540
      rpcFreeMsg(pHead);  // free the compressed message buffer
      pHead = pNewHead;
S
TD-1762  
Shengliang Guan 已提交
1541
      tTrace("decomp malloc mem:%p", temp);
1542
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1543
      tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
1544 1545 1546
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1547
  return pHead;
1548 1549
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1550
static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) {
dengyihao's avatar
dengyihao 已提交
1551
  T_MD5_CTX context;
dengyihao's avatar
dengyihao 已提交
1552
  int       ret = -1;
H
hzcheng 已提交
1553

dengyihao's avatar
dengyihao 已提交
1554
  tMD5Init(&context);
H
Haojun Liao 已提交
1555
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1556
  tMD5Update(&context, (uint8_t *)pMsg, msgLen);
H
Haojun Liao 已提交
1557
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1558
  tMD5Final(&context);
H
hzcheng 已提交
1559 1560 1561 1562 1563 1564

  if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;

  return ret;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1565
static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) {
dengyihao's avatar
dengyihao 已提交
1566
  T_MD5_CTX context;
H
hzcheng 已提交
1567

dengyihao's avatar
dengyihao 已提交
1568
  tMD5Init(&context);
H
Haojun Liao 已提交
1569
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1570
  tMD5Update(&context, (uint8_t *)pMsg, msgLen);
H
Haojun Liao 已提交
1571
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1572
  tMD5Final(&context);
H
hzcheng 已提交
1573 1574 1575

  memcpy(pAuth, context.digest, sizeof(context.digest));
}
1576 1577

static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1578
  SRpcHead *pHead = (SRpcHead *)msg;
1579

1580
  if (pConn->spi && pConn->secured == 0) {
1581
    // add auth part
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1582
    pHead->spi = pConn->spi;
1583 1584 1585
    SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1586
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1587
    rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
1588
  } else {
1589
    pHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1590
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1591 1592 1593 1594 1595 1596
  }

  return msgLen;
}

static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1597
  SRpcHead *pHead = (SRpcHead *)msg;
1598
  int       code = 0;
1599

dengyihao's avatar
dengyihao 已提交
1600 1601
  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
    // secured link, or no authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1602
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1603
    // tTrace("%s, secured link, no auth is required", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1604 1605 1606
    return 0;
  }

dengyihao's avatar
dengyihao 已提交
1607
  if (!rpcIsReq(pHead->msgType)) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1608 1609
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
1610
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
dengyihao's avatar
dengyihao 已提交
1611 1612
        code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
        code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
1613
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1614
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1615
      return 0;
1616
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1617
  }
dengyihao's avatar
dengyihao 已提交
1618

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1619
  code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1620
  if (pHead->spi == pConn->spi) {
1621
    // authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1622
    SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest));
1623 1624 1625 1626 1627

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1628
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
1629
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
1630
    } else {
dengyihao's avatar
dengyihao 已提交
1631
      if (rpcAuthenticateMsg(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
1632
        tDebug("%s, authentication failed, msg discarded", pConn->info);
1633
        code = TSDB_CODE_RPC_AUTH_FAILURE;
1634
      } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1635
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
dengyihao's avatar
dengyihao 已提交
1636
        if (!rpcIsReq(pHead->msgType)) pConn->secured = 1;  // link is secured for client
1637
        // tTrace("%s, message is authenticated", pConn->info);
1638 1639 1640
      }
    }
  } else {
S
Shengliang 已提交
1641
    tError("%s, auth spi:%d not matched with received:%d %p", pConn->info, pConn->spi, pHead->spi, pConn);
1642
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
1643 1644 1645 1646 1647
  }

  return code;
}

1648
static void rpcLockConn(SRpcConn *pConn) {
S
TD-2616  
Shengliang Guan 已提交
1649
  int64_t tid = taosGetSelfPthreadId();
1650
  int     i = 0;
1651 1652 1653 1654 1655 1656 1657 1658
  while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) {
    if (++i % 1000 == 0) {
      sched_yield();
    }
  }
}

static void rpcUnlockConn(SRpcConn *pConn) {
S
TD-2616  
Shengliang Guan 已提交
1659
  int64_t tid = taosGetSelfPthreadId();
1660 1661 1662 1663
  if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) {
    assert(false);
  }
}
1664

dengyihao's avatar
dengyihao 已提交
1665
static void rpcAddRef(SRpcInfo *pRpc) { atomic_add_fetch_32(&pRpc->refCount, 1); }
1666

dengyihao's avatar
dengyihao 已提交
1667
static void rpcDecRef(SRpcInfo *pRpc) {
1668
  if (atomic_sub_fetch_32(&pRpc->refCount, 1) == 0) {
1669
    rpcCloseConnCache(pRpc->pCache);
dengyihao's avatar
dengyihao 已提交
1670
    taosHashCleanup(pRpc->hash);
dengyihao's avatar
dengyihao 已提交
1671
    taosTmrCleanUp(pRpc->tmrCtrl);
dengyihao's avatar
dengyihao 已提交
1672
    taosIdPoolCleanUp(pRpc->idPool);
1673

wafwerar's avatar
wafwerar 已提交
1674
    taosMemoryFreeClear(pRpc->connList);
wafwerar's avatar
wafwerar 已提交
1675
    taosThreadMutexDestroy(&pRpc->mutex);
1676
    tDebug("%s rpc resources are released", pRpc->label);
wafwerar's avatar
wafwerar 已提交
1677
    taosMemoryFreeClear(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1678

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1679
    atomic_sub_fetch_32(&tsRpcNum, 1);
1680 1681
  }
}
dengyihao's avatar
dengyihao 已提交
1682
#endif