rpcMain.c 55.1 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "lz4.h"
S
slguan 已提交
17
#include "os.h"
dengyihao's avatar
dengyihao 已提交
18 19 20 21 22 23 24 25
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
H
hzcheng 已提交
26 27 28
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
H
Hongze Cheng 已提交
29
#include "tmsg.h"
dengyihao's avatar
dengyihao 已提交
30
#include "transportInt.h"
dengyihao's avatar
dengyihao 已提交
31
#include "tref.h"
S
slguan 已提交
32
#include "trpc.h"
dengyihao's avatar
dengyihao 已提交
33 34 35
#include "ttimer.h"
#include "tutil.h"

dengyihao's avatar
dengyihao 已提交
36 37 38 39 40 41 42 43 44
static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT;

int tsRpcMaxUdpSize = 15000;  // bytes
int tsProgressTimer = 100;
// not configurable
int tsRpcMaxRetry;
int tsRpcHeadSize;
int tsRpcOverhead;

S
Shengliang 已提交
45 46
SHashObj *tsFqdnHash;

dengyihao's avatar
dengyihao 已提交
47 48
#ifndef USE_UV

dengyihao's avatar
dengyihao 已提交
49 50 51 52 53 54 55 56
typedef struct {
  int      sessions;      // number of sessions allowed
  int      numOfThreads;  // number of threads to process incoming messages
  int      idleTime;      // milliseconds;
  uint16_t localPort;
  int8_t   connType;
  int      index;  // for UDP server only, round robin for multiple threads
  char     label[TSDB_LABEL_LEN];
dengyihao's avatar
dengyihao 已提交
57

dengyihao's avatar
dengyihao 已提交
58 59 60 61 62
  char user[TSDB_UNI_LEN];         // meter ID
  char spi;                        // security parameter index
  char encrypt;                    // encrypt algorithm
  char secret[TSDB_PASSWORD_LEN];  // secret for the link
  char ckey[TSDB_PASSWORD_LEN];    // ciphering key
dengyihao's avatar
dengyihao 已提交
63

dengyihao's avatar
dengyihao 已提交
64 65
  void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
  int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
dengyihao's avatar
dengyihao 已提交
66

dengyihao's avatar
fix bug  
dengyihao 已提交
67
  bool             noPool;
dengyihao's avatar
dengyihao 已提交
68
  int32_t          refCount;
dengyihao's avatar
dengyihao 已提交
69 70 71 72 73 74 75
  void *           parent;
  void *           idPool;     // handle to ID pool
  void *           tmrCtrl;    // handle to timer
  SHashObj *       hash;       // handle returned by hash utility
  void *           tcphandle;  // returned handle from TCP initialization
  void *           udphandle;  // returned handle from UDP initialization
  void *           pCache;     // connection cache
dengyihao's avatar
dengyihao 已提交
76
  pthread_mutex_t  mutex;
dengyihao's avatar
dengyihao 已提交
77
  struct SRpcConn *connList;  // connection list
dengyihao's avatar
dengyihao 已提交
78
} SRpcInfo;
dengyihao's avatar
dengyihao 已提交
79

dengyihao's avatar
dengyihao 已提交
80
typedef struct {
dengyihao's avatar
dengyihao 已提交
81
  SRpcInfo *       pRpc;      // associated SRpcInfo
dengyihao's avatar
dengyihao 已提交
82
  SEpSet           epSet;     // ip list provided by app
dengyihao's avatar
dengyihao 已提交
83 84
  void *           ahandle;   // handle provided by app
  struct SRpcConn *pConn;     // pConn allocated
dengyihao's avatar
dengyihao 已提交
85
  tmsg_t           msgType;   // message type
dengyihao's avatar
dengyihao 已提交
86
  uint8_t *        pCont;     // content provided by app
dengyihao's avatar
dengyihao 已提交
87 88 89 90 91 92 93
  int32_t          contLen;   // content length
  int32_t          code;      // error code
  int16_t          numOfTry;  // number of try for different servers
  int8_t           oldInUse;  // server EP inUse passed by app
  int8_t           redirect;  // flag to indicate redirect
  int8_t           connType;  // connection type
  int64_t          rid;       // refId returned by taosAddRef
dengyihao's avatar
dengyihao 已提交
94 95 96
  SRpcMsg *        pRsp;      // for synchronous API
  tsem_t *         pSem;      // for synchronous API
  SEpSet *         pSet;      // for synchronous API
dengyihao's avatar
dengyihao 已提交
97 98 99
  char             msg[0];    // RpcHead starts from here
} SRpcReqContext;

dengyihao's avatar
dengyihao 已提交
100
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
dengyihao's avatar
dengyihao 已提交
101
#define rpcHeadFromCont(cont) ((SRpcHead *)((char *)cont - sizeof(SRpcHead)))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102 103 104
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105
#define rpcIsReq(type) (type & 1U)
H
hzcheng 已提交
106

J
Jeff Tao 已提交
107
typedef struct SRpcConn {
dengyihao's avatar
dengyihao 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
  char            info[48];                   // debug info: label + pConn + ahandle
  int             sid;                        // session ID
  uint32_t        ownId;                      // own link ID
  uint32_t        peerId;                     // peer link ID
  char            user[TSDB_UNI_LEN];         // user ID for the link
  char            spi;                        // security parameter index
  char            encrypt;                    // encryption, 0:1
  char            secret[TSDB_PASSWORD_LEN];  // secret for the link
  char            ckey[TSDB_PASSWORD_LEN];    // ciphering key
  char            secured;                    // if set to 1, no authentication
  uint16_t        localPort;                  // for UDP only
  uint32_t        linkUid;                    // connection unique ID assigned by client
  uint32_t        peerIp;                     // peer IP
  uint16_t        peerPort;                   // peer port
  char            peerFqdn[TSDB_FQDN_LEN];    // peer FQDN or ip string
  uint16_t        tranId;                     // outgoing transcation ID, for build message
  uint16_t        outTranId;                  // outgoing transcation ID
  uint16_t        inTranId;                   // transcation ID for incoming msg
  tmsg_t          outType;                    // message type for outgoing request
  tmsg_t          inType;                     // message type for incoming request
  void *          chandle;                    // handle passed by TCP/UDP connection layer
  void *          ahandle;                    // handle provided by upper app layter
  int             retry;                      // number of retry for sending request
  int             tretry;                     // total retry
  void *          pTimer;                     // retry timer to monitor the response
  void *          pIdleTimer;                 // idle timer
  char *          pRspMsg;                    // response message including header
  int             rspMsgLen;                  // response messag length
  char *          pReqMsg;                    // request message including header
  int             reqMsgLen;                  // request message length
  SRpcInfo *      pRpc;                       // the associated SRpcInfo
  int8_t          connType;                   // connection type
  int64_t         lockedBy;                   // lock for connection
  SRpcReqContext *pContext;                   // request context
H
hzcheng 已提交
142 143
} SRpcConn;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144 145
static int     tsRpcRefId = -1;
static int32_t tsRpcNum = 0;
S
Shengliang Guan 已提交
146

dengyihao's avatar
dengyihao 已提交
147
// static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
148

149
// server:0 client:1  tcp:2 udp:0
dengyihao's avatar
dengyihao 已提交
150 151 152 153
#define RPC_CONN_UDPS 0
#define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3
154

J
jtao1735 已提交
155
void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
dengyihao's avatar
dengyihao 已提交
156
    taosInitUdpConnection, taosInitUdpConnection, taosInitTcpServer, taosInitTcpClient};
H
hzcheng 已提交
157

dengyihao's avatar
dengyihao 已提交
158 159
void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer,
                                            taosCleanUpTcpClient};
H
hzcheng 已提交
160

161
void (*taosStopConn[])(void *thandle) = {
dengyihao's avatar
dengyihao 已提交
162 163
    taosStopUdpConnection,
    taosStopUdpConnection,
164
    taosStopTcpServer,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
165
    taosStopTcpClient,
166 167
};

168
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
dengyihao's avatar
dengyihao 已提交
169
    taosSendUdpData, taosSendUdpData, taosSendTcpData, taosSendTcpData};
H
hzcheng 已提交
170

J
jtao1735 已提交
171
void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = {
S
slguan 已提交
172 173 174 175
    taosOpenUdpConnection,
    taosOpenUdpConnection,
    NULL,
    taosOpenTcpClientConnection,
H
hzcheng 已提交
176 177
};

dengyihao's avatar
dengyihao 已提交
178
void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpConnection, taosCloseTcpConnection};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179

J
jtao1735 已提交
180
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
181
static void      rpcCloseConn(void *thandle);
182
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext);
183
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
184 185
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
H
hzcheng 已提交
186

dengyihao's avatar
dengyihao 已提交
187 188 189 190 191
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void rpcSendReqHead(SRpcConn *pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
192

193
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
S
slguan 已提交
194
static void  rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext);
195 196 197
static void  rpcProcessConnError(void *param, void *id);
static void  rpcProcessRetryTimer(void *, void *);
static void  rpcProcessIdleTimer(void *param, void *tmrId);
198
static void  rpcProcessProgressTimer(void *param, void *tmrId);
H
hzcheng 已提交
199

dengyihao's avatar
dengyihao 已提交
200 201
static void      rpcFreeMsg(void *msg);
static int32_t   rpcCompressRpcMsg(char *pCont, int32_t contLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
202
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
dengyihao's avatar
dengyihao 已提交
203 204 205 206 207 208
static int       rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
static int       rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
static void      rpcLockConn(SRpcConn *pConn);
static void      rpcUnlockConn(SRpcConn *pConn);
static void      rpcAddRef(SRpcInfo *pRpc);
static void      rpcDecRef(SRpcInfo *pRpc);
H
hzcheng 已提交
209

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
210 211 212 213 214
static void rpcFree(void *p) {
  tTrace("free mem: %p", p);
  free(p);
}

215 216 217 218
static void rpcInitImp(void) {
  tsProgressTimer = tsRpcTimer / 2;
  tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsProgressTimer;
  tsRpcHeadSize = RPC_MSG_OVERHEAD;
219
  tsRpcOverhead = sizeof(SRpcReqContext);
H
hzcheng 已提交
220

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
221
  tsRpcRefId = taosOpenRef(200, rpcFree);
S
Shengliang 已提交
222 223

  tsFqdnHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
224
}
225

S
Shengliang Guan 已提交
226
int32_t rpcInit() {
227
  pthread_once(&tsRpcInitOnce, rpcInitImp);
228
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229
}
230

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231 232
void rpcCleanup(void) {
  taosCloseRef(tsRpcRefId);
S
Shengliang 已提交
233 234 235
  taosHashClear(tsFqdnHash);
  taosHashCleanup(tsFqdnHash);
  tsFqdnHash = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236 237
  tsRpcRefId = -1;
}
dengyihao's avatar
dengyihao 已提交
238

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239 240 241
void *rpcOpen(const SRpcInit *pInit) {
  SRpcInfo *pRpc;

dengyihao's avatar
dengyihao 已提交
242
  // pthread_once(&tsRpcInit, rpcInit);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244 245
  pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
  if (pRpc == NULL) return NULL;
H
hzcheng 已提交
246

H
Haojun Liao 已提交
247
  if (pInit->label) tstrncpy(pRpc->label, pInit->label, tListLen(pInit->label));
dengyihao's avatar
dengyihao 已提交
248

249
  pRpc->connType = pInit->connType;
Y
TD-3115  
yihaoDeng 已提交
250 251
  if (pRpc->connType == TAOS_CONN_CLIENT) {
    pRpc->numOfThreads = pInit->numOfThreads;
dengyihao's avatar
dengyihao 已提交
252 253 254
    if (pRpc->numOfThreads >= 10) {
      pRpc->numOfThreads = 10;
    }
Y
TD-3115  
yihaoDeng 已提交
255
  } else {
dengyihao's avatar
dengyihao 已提交
256
    pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
Y
TD-3115  
yihaoDeng 已提交
257
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
258 259 260
  pRpc->idleTime = pInit->idleTime;
  pRpc->localPort = pInit->localPort;
  pRpc->afp = pInit->afp;
dengyihao's avatar
dengyihao 已提交
261
  pRpc->sessions = pInit->sessions + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
262 263 264
  if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
  if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret));
  if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
265
  pRpc->spi = pInit->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
266
  pRpc->cfp = pInit->cfp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
267
  pRpc->afp = pInit->afp;
S
Shengliang Guan 已提交
268
  pRpc->parent = pInit->parent;
269
  pRpc->refCount = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
270

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271 272
  atomic_add_fetch_32(&tsRpcNum, 1);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
273
  size_t size = sizeof(SRpcConn) * pRpc->sessions;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
274 275
  pRpc->connList = (SRpcConn *)calloc(1, size);
  if (pRpc->connList == NULL) {
S
TD-1530  
Shengliang Guan 已提交
276
    tError("%s failed to allocate memory for taos connections, size:%" PRId64, pRpc->label, (int64_t)size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
277
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
278 279
    return NULL;
  }
H
hzcheng 已提交
280

dengyihao's avatar
dengyihao 已提交
281
  pRpc->idPool = taosInitIdPool(pRpc->sessions - 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
282 283
  if (pRpc->idPool == NULL) {
    tError("%s failed to init ID pool", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
284
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
285
    return NULL;
H
hzcheng 已提交
286
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
287

dengyihao's avatar
dengyihao 已提交
288
  pRpc->tmrCtrl = taosTmrInit(pRpc->sessions * 2 + 1, 50, 10000, pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289 290
  if (pRpc->tmrCtrl == NULL) {
    tError("%s failed to init timers", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
291
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
292 293
    return NULL;
  }
H
hzcheng 已提交
294

295
  if (pRpc->connType == TAOS_CONN_SERVER) {
H
Haojun Liao 已提交
296
    pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true);
297 298 299 300 301 302
    if (pRpc->hash == NULL) {
      tError("%s failed to init string hash", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
  } else {
dengyihao's avatar
dengyihao 已提交
303 304
    pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20);
    if (pRpc->pCache == NULL) {
305 306 307 308
      tError("%s failed to init connection cache", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
309
  }
H
hzcheng 已提交
310

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
311 312
  pthread_mutex_init(&pRpc->mutex, NULL);

dengyihao's avatar
dengyihao 已提交
313 314 315 316
  pRpc->tcphandle = (*taosInitConn[pRpc->connType | RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads,
                                                                   rpcProcessMsgFromPeer, pRpc);
  pRpc->udphandle =
      (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
317 318 319 320 321 322 323

  if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) {
    tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort);
    rpcClose(pRpc);
    return NULL;
  }

S
TD-1843  
Shengliang Guan 已提交
324
  tDebug("%s rpc is opened, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions);
H
hzcheng 已提交
325

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
326
  return pRpc;
H
hzcheng 已提交
327 328
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
329 330
void rpcClose(void *param) {
  SRpcInfo *pRpc = (SRpcInfo *)param;
H
hzcheng 已提交
331

332
  // stop connection to outside first
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
333 334
  (*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
  (*taosStopConn[pRpc->connType])(pRpc->udphandle);
335

dengyihao's avatar
dengyihao 已提交
336
  // close all connections
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
337
  for (int i = 0; i < pRpc->sessions; ++i) {
338
    if (pRpc->connList && pRpc->connList[i].user[0]) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
339
      rpcCloseConn((void *)(pRpc->connList + i));
H
hzcheng 已提交
340 341 342
    }
  }

343
  // clean up
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
344 345
  (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
  (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
J
Jeff Tao 已提交
346

347
  tDebug("%s rpc is closed", pRpc->label);
348
  rpcDecRef(pRpc);
H
hzcheng 已提交
349 350
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
351 352
void *rpcMallocCont(int contLen) {
  int size = contLen + RPC_MSG_OVERHEAD;
353

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
354 355
  char *start = (char *)calloc(1, (size_t)size);
  if (start == NULL) {
356 357
    tError("failed to malloc msg, size:%d", size);
    return NULL;
358
  } else {
S
TD-1762  
Shengliang Guan 已提交
359
    tTrace("malloc mem:%p size:%d", start, size);
360 361
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
362
  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
363 364 365
}

void rpcFreeCont(void *cont) {
S
Shengliang Guan 已提交
366
  if (cont) {
367 368
    char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext);
    free(temp);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
369
    tTrace("free mem: %p", temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
370
  }
371 372
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
373 374 375 376
void *rpcReallocCont(void *ptr, int contLen) {
  if (ptr == NULL) return rpcMallocCont(contLen);

  char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead);
dengyihao's avatar
dengyihao 已提交
377 378
  if (contLen == 0) {
    free(start);
J
Jeff Tao 已提交
379
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
380 381 382 383 384 385 386
  }

  int size = contLen + RPC_MSG_OVERHEAD;
  start = realloc(start, size);
  if (start == NULL) {
    tError("failed to realloc cont, size:%d", size);
    return NULL;
dengyihao's avatar
dengyihao 已提交
387
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
388 389 390 391

  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}

S
Shengliang Guan 已提交
392
void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
dengyihao's avatar
dengyihao 已提交
393
  SRpcInfo *      pRpc = (SRpcInfo *)shandle;
394 395
  SRpcReqContext *pContext;

J
Jeff Tao 已提交
396
  int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
397
  pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
398
  pContext->ahandle = pMsg->ahandle;
399
  pContext->pRpc = (SRpcInfo *)shandle;
400
  pContext->epSet = *pEpSet;
J
Jeff Tao 已提交
401
  pContext->contLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
402 403
  pContext->pCont = pMsg->pCont;
  pContext->msgType = pMsg->msgType;
404
  pContext->oldInUse = pEpSet->inUse;
405

dengyihao's avatar
dengyihao 已提交
406 407
  pContext->connType = RPC_CONN_UDPC;
  if (contLen > tsRpcMaxUdpSize || tsRpcForceTcp) pContext->connType = RPC_CONN_TCPC;
408

dengyihao's avatar
dengyihao 已提交
409
  // connection type is application specific.
410
  // for TDengine, all the query, show commands shall have TCP connection
411
  tmsg_t type = pMsg->msgType;
dengyihao's avatar
dengyihao 已提交
412 413 414
  if (type == TDMT_VND_QUERY || type == TDMT_MND_SHOW_RETRIEVE || type == TDMT_VND_FETCH ||
      type == TDMT_MND_VGROUP_LIST || type == TDMT_VND_TABLES_META || type == TDMT_VND_TABLE_META ||
      type == TDMT_MND_SHOW || type == TDMT_MND_STATUS || type == TDMT_VND_ALTER_TABLE)
415
    pContext->connType = RPC_CONN_TCPC;
张宏权 已提交
416

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
417
  pContext->rid = taosAddRef(tsRpcRefId, pContext);
418
  if (pRid) *pRid = pContext->rid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
419

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
420
  rpcSendReqToServer(pRpc, pContext);
421 422
}

J
Jeff Tao 已提交
423
void rpcSendResponse(const SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
424
  ASSERT(pRsp->handle != NULL);
425

dengyihao's avatar
dengyihao 已提交
426 427 428 429 430
  int       msgLen = 0;
  SRpcConn *pConn = (SRpcConn *)pRsp->handle;
  SRpcMsg   rpcMsg = *pRsp;
  SRpcMsg * pMsg = &rpcMsg;
  SRpcInfo *pRpc = pConn->pRpc;
J
Jeff Tao 已提交
431

dengyihao's avatar
dengyihao 已提交
432
  if (pMsg->pCont == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
433 434
    pMsg->pCont = rpcMallocCont(0);
    pMsg->contLen = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
435 436
  }

dengyihao's avatar
dengyihao 已提交
437 438
  SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont);
  char *    msg = (char *)pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
439

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
440 441
  pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
  msgLen = rpcMsgLenFromCont(pMsg->contLen);
442

443
  rpcLockConn(pConn);
444

dengyihao's avatar
dengyihao 已提交
445
  if (pConn->inType == 0 || pConn->user[0] == 0) {
H
Haojun Liao 已提交
446
    tError("%s, connection is already released, rsp wont be sent", pConn->info);
447
    rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
448 449
    rpcFreeCont(pMsg->pCont);
    rpcDecRef(pRpc);
450 451 452
    return;
  }

453
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
454
  pHead->version = 1;
dengyihao's avatar
dengyihao 已提交
455
  pHead->msgType = pConn->inType + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
456 457
  pHead->spi = pConn->spi;
  pHead->encrypt = pConn->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
458 459 460
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
461
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
462
  pHead->port = htons(pConn->localPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
463
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
464 465
  pHead->ahandle = (uint64_t)pConn->ahandle;

466 467
  // set pConn parameters
  pConn->inType = 0;
468 469

  // response message is released until new response is sent
dengyihao's avatar
dengyihao 已提交
470
  rpcFreeMsg(pConn->pRspMsg);
471 472
  pConn->pRspMsg = msg;
  pConn->rspMsgLen = msgLen;
473
  if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--;
474

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
475
  // stop the progress timer
476
  taosTmrStopA(&pConn->pTimer);
477 478

  // set the idle timer to monitor the activity
Y
yihaoDeng 已提交
479
  taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime * 30, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
480
  rpcSendMsgToPeer(pConn, msg, msgLen);
481 482

  // if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured
dengyihao's avatar
dengyihao 已提交
483
  if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY) pConn->secured = 1;  // connection shall be secured
484 485

  if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
486 487
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
488

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
489
  rpcUnlockConn(pConn);
dengyihao's avatar
dengyihao 已提交
490
  rpcDecRef(pRpc);  // decrease the referene count
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
491

492 493 494
  return;
}

S
Shengliang Guan 已提交
495
void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
496
  SRpcMsg rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
497
  memset(&rpcMsg, 0, sizeof(rpcMsg));
dengyihao's avatar
dengyihao 已提交
498

S
Shengliang Guan 已提交
499
  rpcMsg.contLen = sizeof(SEpSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
500 501
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  if (rpcMsg.pCont == NULL) return;
502

S
Shengliang Guan 已提交
503
  memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet));
504

505
  rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
506
  rpcMsg.handle = thandle;
507

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
508
  rpcSendResponse(&rpcMsg);
509 510 511 512

  return;
}

513
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
dengyihao's avatar
dengyihao 已提交
514
  SRpcConn *pConn = (SRpcConn *)thandle;
515
  if (pConn->user[0] == 0) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
516

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
517 518
  pInfo->clientIp = pConn->peerIp;
  pInfo->clientPort = pConn->peerPort;
J
jtao1735 已提交
519
  // pInfo->serverIp = pConn->destIp;
dengyihao's avatar
dengyihao 已提交
520

B
Bomin Zhang 已提交
521
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
522
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
523 524
}

S
Shengliang Guan 已提交
525
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
526
  SRpcReqContext *pContext;
dengyihao's avatar
dengyihao 已提交
527
  pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
528 529

  memset(pRsp, 0, sizeof(SRpcMsg));
dengyihao's avatar
dengyihao 已提交
530 531

  tsem_t sem;
532 533 534
  tsem_init(&sem, 0, 0);
  pContext->pSem = &sem;
  pContext->pRsp = pRsp;
535
  pContext->pSet = pEpSet;
536

537
  rpcSendRequest(shandle, pEpSet, pMsg, NULL);
538 539 540 541 542 543 544

  tsem_wait(&sem);
  tsem_destroy(&sem);

  return;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
545
// this API is used by server app to keep an APP context in case connection is broken
546
int rpcReportProgress(void *handle, char *pCont, int contLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
547
  SRpcConn *pConn = (SRpcConn *)handle;
dengyihao's avatar
dengyihao 已提交
548
  int       code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
549

550
  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
551

552 553
  if (pConn->user[0]) {
    // pReqMsg and reqMsgLen is re-used to store the context from app server
dengyihao's avatar
dengyihao 已提交
554
    pConn->pReqMsg = pCont;
555
    pConn->reqMsgLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
556
  } else {
557
    tDebug("%s, rpc connection is already released", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
558 559 560
    rpcFreeCont(pCont);
    code = -1;
  }
561

562
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
563
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
564 565
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
566 567 568
void rpcCancelRequest(int64_t rid) {
  SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid);
  if (pContext == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
569

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
570
  rpcCloseConn(pContext->pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
571

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
572
  taosReleaseRef(tsRpcRefId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
573 574
}

575
static void rpcFreeMsg(void *msg) {
dengyihao's avatar
dengyihao 已提交
576
  if (msg) {
577 578
    char *temp = (char *)msg - sizeof(SRpcReqContext);
    free(temp);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
579
    tTrace("free mem: %p", temp);
580 581 582
  }
}

J
jtao1735 已提交
583
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
584
  SRpcConn *pConn;
S
slguan 已提交
585

S
Shengliang 已提交
586 587 588 589 590 591 592 593 594 595 596
  uint32_t  peerIp = 0;
  uint32_t *pPeerIp = taosHashGet(tsFqdnHash, peerFqdn, strlen(peerFqdn) + 1);
  if (pPeerIp != NULL) {
    peerIp = *pPeerIp;
  } else {
    peerIp = taosGetIpv4FromFqdn(peerFqdn);
    if (peerIp != 0xFFFFFFFF) {
      taosHashPut(tsFqdnHash, peerFqdn, strlen(peerFqdn) + 1, &peerIp, sizeof(peerIp));
    }
  }

597
  if (peerIp == 0xFFFFFFFF) {
dengyihao's avatar
dengyihao 已提交
598 599
    tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn);
    terrno = TSDB_CODE_RPC_FQDN_ERROR;
J
jtao1735 已提交
600 601 602
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
603
  pConn = rpcAllocateClientConn(pRpc);
H
hzcheng 已提交
604

dengyihao's avatar
dengyihao 已提交
605
  if (pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
606
    tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn));
J
jtao1735 已提交
607
    pConn->peerIp = peerIp;
608
    pConn->peerPort = peerPort;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
609
    tstrncpy(pConn->user, pRpc->user, sizeof(pConn->user));
610
    pConn->connType = connType;
611

612
    if (taosOpenConn[connType]) {
dengyihao's avatar
dengyihao 已提交
613
      void *shandle = (connType & RPC_CONN_TCP) ? pRpc->tcphandle : pRpc->udphandle;
J
jtao1735 已提交
614
      pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
615
      if (pConn->chandle == NULL) {
H
Haojun Liao 已提交
616
        tError("failed to connect to:%s:%d", taosIpStr(pConn->peerIp), pConn->peerPort);
617

618
        terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
619 620 621
        rpcCloseConn(pConn);
        pConn = NULL;
      }
H
hzcheng 已提交
622 623 624
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
625
  return pConn;
H
hzcheng 已提交
626 627
}

628
static void rpcReleaseConn(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
629
  SRpcInfo *pRpc = pConn->pRpc;
630
  if (pConn->user[0] == 0) return;
H
hzcheng 已提交
631

632 633
  pConn->user[0] = 0;
  if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle);
H
hzcheng 已提交
634

635 636 637
  taosTmrStopA(&pConn->pTimer);
  taosTmrStopA(&pConn->pIdleTimer);

dengyihao's avatar
dengyihao 已提交
638 639 640 641
  if (pRpc->connType == TAOS_CONN_SERVER) {
    char   hashstr[40] = {0};
    size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId,
                           pConn->connType);
J
jtao1735 已提交
642
    taosHashRemove(pRpc->hash, hashstr, size);
dengyihao's avatar
dengyihao 已提交
643
    rpcFreeMsg(pConn->pRspMsg);  // it may have a response msg saved, but not request msg
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
644
    pConn->pRspMsg = NULL;
dengyihao's avatar
dengyihao 已提交
645

646 647
    // if server has ever reported progress, free content
    if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);  // do not use rpcFreeMsg
648
  } else {
649
    // if there is an outgoing message, free it
陶建辉(Jeff)'s avatar
TD-1669  
陶建辉(Jeff) 已提交
650
    if (pConn->outType && pConn->pReqMsg) {
651
      SRpcReqContext *pContext = pConn->pContext;
Y
yihaoDeng 已提交
652
      if (pContext) {
dengyihao's avatar
dengyihao 已提交
653 654
        if (pContext->pRsp) {
          // for synchronous API, post semaphore to unblock app
Y
yihaoDeng 已提交
655 656 657 658 659
          pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR;
          pContext->pRsp->pCont = NULL;
          pContext->pRsp->contLen = 0;
          tsem_post(pContext->pSem);
        }
dengyihao's avatar
dengyihao 已提交
660
        pContext->pConn = NULL;
Y
yihaoDeng 已提交
661 662
        taosRemoveRef(tsRpcRefId, pContext->rid);
      } else {
dengyihao's avatar
dengyihao 已提交
663
        assert(0);
664
      }
陶建辉(Jeff)'s avatar
TD-1669  
陶建辉(Jeff) 已提交
665
    }
666
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
667

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
668 669 670 671 672 673
  // memset could not be used, since lockeBy can not be reset
  pConn->inType = 0;
  pConn->outType = 0;
  pConn->inTranId = 0;
  pConn->outTranId = 0;
  pConn->secured = 0;
674
  pConn->peerId = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
675 676
  pConn->peerIp = 0;
  pConn->peerPort = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
677 678 679
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
  pConn->pContext = NULL;
680
  pConn->chandle = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
681 682

  taosFreeId(pRpc->idPool, pConn->sid);
683
  tDebug("%s, rpc connection is released", pConn->info);
684 685 686 687
}

static void rpcCloseConn(void *thandle) {
  SRpcConn *pConn = (SRpcConn *)thandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
688
  if (pConn == NULL) return;
689 690 691

  rpcLockConn(pConn);

dengyihao's avatar
dengyihao 已提交
692
  if (pConn->user[0]) rpcReleaseConn(pConn);
693

694
  rpcUnlockConn(pConn);
H
hzcheng 已提交
695 696
}

697 698
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
699

700 701 702
  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
703
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
704
  } else {
705 706 707 708
    pConn = pRpc->connList + sid;

    pConn->pRpc = pRpc;
    pConn->sid = sid;
709
    pConn->tranId = (uint16_t)(taosRand() & 0xFFFF);
710
    pConn->ownId = htonl(pConn->sid);
711
    pConn->linkUid = (uint32_t)((int64_t)pConn + taosGetPId() + (int64_t)pConn->tranId);
712 713
    pConn->spi = pRpc->spi;
    pConn->encrypt = pRpc->encrypt;
714
    if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_PASSWORD_LEN);
715
    tDebug("%s %p client connection is allocated, uid:0x%x", pRpc->label, pConn, pConn->linkUid);
716
  }
H
hzcheng 已提交
717

718 719
  return pConn;
}
H
hzcheng 已提交
720

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
721
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
722
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
723
  char      hashstr[40] = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
724 725
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;

dengyihao's avatar
dengyihao 已提交
726 727 728
  size_t size =
      snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);

729
  // check if it is already allocated
J
jtao1735 已提交
730
  SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
731
  if (ppConn) pConn = *ppConn;
732 733 734 735
  if (pConn) {
    pConn->secured = 0;
    return pConn;
  }
736

737 738 739 740 741 742
  // if code is not 0, it means it is simple reqhead, just ignore
  if (pHead->code != 0) {
    terrno = TSDB_CODE_RPC_ALREADY_PROCESSED;
    return NULL;
  }

743 744 745
  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
746
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
747 748
  } else {
    pConn = pRpc->connList + sid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
749
    memcpy(pConn->user, pHead->user, tListLen(pConn->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
750
    pConn->pRpc = pRpc;
H
hzcheng 已提交
751 752
    pConn->sid = sid;
    pConn->tranId = (uint16_t)(rand() & 0xFFFF);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
753
    pConn->ownId = htonl(pConn->sid);
754
    pConn->linkUid = pHead->linkUid;
755
    if (pRpc->afp) {
756
      if (pConn->user[0] == 0) {
757
        terrno = TSDB_CODE_RPC_AUTH_REQUIRED;
758
      } else {
S
Shengliang Guan 已提交
759
        terrno = (*pRpc->afp)(pRpc->parent, pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
760 761
      }

762
      if (terrno != 0) {
S
Shengliang Guan 已提交
763
        taosFreeId(pRpc->idPool, sid);  // sid shall be released
764 765
        pConn = NULL;
      }
H
hzcheng 已提交
766
    }
S
Shengliang Guan 已提交
767
  }
H
hzcheng 已提交
768

769
  if (pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
770 771
    if (pRecv->connType == RPC_CONN_UDPS && pRpc->numOfThreads > 1) {
      // UDP server, assign to new connection
S
Shengliang Guan 已提交
772
      pRpc->index = (pRpc->index + 1) % pRpc->numOfThreads;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
773 774
      pConn->localPort = (pRpc->localPort + pRpc->index);
    }
S
Shengliang Guan 已提交
775

J
jtao1735 已提交
776
    taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
777 778
    tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s spi:%d", pRpc->label, pConn, pConn->linkUid,
           sid, hashstr, pConn->spi);
779 780 781 782 783
  }

  return pConn;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
784
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
dengyihao's avatar
dengyihao 已提交
785
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
786
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
787 788 789

  if (sid) {
    pConn = pRpc->connList + sid;
790
    if (pConn->user[0] == 0) pConn = NULL;
dengyihao's avatar
dengyihao 已提交
791
  }
792

dengyihao's avatar
dengyihao 已提交
793
  if (pConn == NULL) {
794 795 796
    if (pRpc->connType == TAOS_CONN_SERVER) {
      pConn = rpcAllocateServerConn(pRpc, pRecv);
    } else {
797
      terrno = TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
798 799
    }
  }
800

801
  if (pConn) {
802
    if (pConn->linkUid != pHead->linkUid) {
803
      terrno = TSDB_CODE_RPC_MISMATCHED_LINK_ID;
S
Shengliang Guan 已提交
804 805
      tDebug("%s %p %p, linkUid:0x%x is not matched with received:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
             pConn->linkUid, pHead->linkUid);
806
      pConn = NULL;
H
hzcheng 已提交
807 808 809
    }
  }

810
  return pConn;
H
hzcheng 已提交
811 812
}

813
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
S
Shengliang Guan 已提交
814 815
  SRpcConn *pConn;
  SRpcInfo *pRpc = pContext->pRpc;
dengyihao's avatar
dengyihao 已提交
816
  SEpSet *  pEpSet = &pContext->epSet;
H
hzcheng 已提交
817

dengyihao's avatar
dengyihao 已提交
818 819
  pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port,
                              pContext->connType);
dengyihao's avatar
dengyihao 已提交
820
  if (pConn == NULL || pConn->user[0] == 0) {
H
Haojun Liao 已提交
821
    pConn = rpcOpenConn(pRpc, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType);
dengyihao's avatar
dengyihao 已提交
822
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
823 824

  if (pConn) {
S
Shengliang Guan 已提交
825
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
826
    pConn->ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
827
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
828
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
829
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
830
    tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
831
  }
H
hzcheng 已提交
832

833
  return pConn;
H
hzcheng 已提交
834 835
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
836
static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
dengyihao's avatar
dengyihao 已提交
837 838 839 840 841 842
  if (pConn->peerId == 0) {
    pConn->peerId = pHead->sourceId;
  } else {
    if (pConn->peerId != pHead->sourceId) {
      tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, pConn->peerId, pHead->sourceId);
      return TSDB_CODE_RPC_INVALID_VALUE;
H
hzcheng 已提交
843
    }
dengyihao's avatar
dengyihao 已提交
844
  }
H
hzcheng 已提交
845

dengyihao's avatar
dengyihao 已提交
846 847 848 849 850
  if (pConn->inTranId == pHead->tranId) {
    if (pConn->inType == pHead->msgType) {
      if (pHead->code == 0) {
        tDebug("%s, %s is retransmitted", pConn->info, TMSG_INFO(pHead->msgType));
        rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
851
      } else {
dengyihao's avatar
dengyihao 已提交
852
        // do nothing, it is heart beat from client
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
853
      }
dengyihao's avatar
dengyihao 已提交
854 855 856 857 858
    } else if (pConn->inType == 0) {
      tDebug("%s, %s is already processed, tranId:%d", pConn->info, TMSG_INFO(pHead->msgType), pConn->inTranId);
      rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen);  // resend the response
    } else {
      tDebug("%s, mismatched message %s and tranId", pConn->info, TMSG_INFO(pHead->msgType));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
859
    }
H
hzcheng 已提交
860

dengyihao's avatar
dengyihao 已提交
861 862 863
    // do not reply any message
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
  }
H
hzcheng 已提交
864

dengyihao's avatar
dengyihao 已提交
865 866 867 868
  if (pConn->inType != 0) {
    tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, pConn->inTranId, pHead->tranId);
    return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
  }
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
869

dengyihao's avatar
dengyihao 已提交
870 871 872 873
  if (rpcContLenFromMsg(pHead->msgLen) <= 0) {
    tDebug("%s, message body is empty, ignore", pConn->info);
    return TSDB_CODE_RPC_APP_ERROR;
  }
H
hzcheng 已提交
874

dengyihao's avatar
dengyihao 已提交
875 876 877 878 879 880 881 882
  pConn->inTranId = pHead->tranId;
  pConn->inType = pHead->msgType;

  // start the progress timer to monitor the response from server app
  if (pConn->connType != RPC_CONN_TCPS)
    pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl);

  return 0;
H
hzcheng 已提交
883 884
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
885
static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
886
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
887
  pConn->peerId = pHead->sourceId;
H
hzcheng 已提交
888

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
889
  if (pConn->outType == 0 || pConn->pContext == NULL) {
890
    return TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
891
  }
H
hzcheng 已提交
892

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
893
  if (pHead->tranId != pConn->outTranId) {
894
    return TSDB_CODE_RPC_INVALID_TRAN_ID;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
895
  }
H
hzcheng 已提交
896

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
897
  if (pHead->msgType != pConn->outType + 1) {
898
    return TSDB_CODE_RPC_INVALID_RESPONSE_TYPE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
899
  }
H
hzcheng 已提交
900

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
901 902 903
  taosTmrStopA(&pConn->pTimer);
  pConn->retry = 0;

904
  if (pHead->code == TSDB_CODE_RPC_AUTH_REQUIRED && pRpc->spi) {
905
    tDebug("%s, authentication shall be restarted", pConn->info);
906
    pConn->secured = 0;
dengyihao's avatar
dengyihao 已提交
907
    rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
908 909
    if (pConn->connType != RPC_CONN_TCPC)
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
910
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
911 912
  }

913 914 915 916
  if (pHead->code == TSDB_CODE_RPC_MISMATCHED_LINK_ID) {
    tDebug("%s, mismatched linkUid, link shall be restarted", pConn->info);
    pConn->secured = 0;
    ((SRpcHead *)pConn->pReqMsg)->destId = 0;
dengyihao's avatar
dengyihao 已提交
917
    rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
918 919 920 921 922
    if (pConn->connType != RPC_CONN_TCPC)
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
  }

923
  if (pHead->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
924
    if (pConn->tretry <= tsRpcMaxRetry) {
925
      tDebug("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
926 927
      pConn->tretry++;
      rpcSendReqHead(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
928 929
      if (pConn->connType != RPC_CONN_TCPC)
        pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
930
      return TSDB_CODE_RPC_ALREADY_PROCESSED;
H
hzcheng 已提交
931
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
932
      // peer still in processing, give up
933
      tDebug("%s, server processing takes too long time, give up", pConn->info);
934
      pHead->code = TSDB_CODE_RPC_TOO_SLOW;
H
hzcheng 已提交
935 936
    }
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
937

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
938 939
  pConn->outType = 0;
  pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
940
  pConn->reqMsgLen = 0;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
941 942
  SRpcReqContext *pContext = pConn->pContext;

dengyihao's avatar
dengyihao 已提交
943
  if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
S
Shengliang Guan 已提交
944
    if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SEpSet)) {
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
945
      // if EpSet is not included in the msg, treat it as NOT_READY
dengyihao's avatar
dengyihao 已提交
946
      pHead->code = TSDB_CODE_RPC_NOT_READY;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
947 948 949
    } else {
      pContext->redirect++;
      if (pContext->redirect > TSDB_MAX_REPLICA) {
dengyihao's avatar
dengyihao 已提交
950
        pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
951 952 953
        tWarn("%s, too many redirects, quit", pConn->info);
      }
    }
dengyihao's avatar
dengyihao 已提交
954
  }
955 956

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
957 958
}

S
slguan 已提交
959
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) {
dengyihao's avatar
dengyihao 已提交
960 961
  int32_t   sid;
  SRpcConn *pConn = NULL;
H
hzcheng 已提交
962

963
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
964

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
965
  sid = htonl(pHead->destId);
S
slguan 已提交
966
  *ppContext = NULL;
H
hzcheng 已提交
967

968
  if (TMSG_INDEX(pHead->msgType) >= TDMT_MAX || TMSG_INDEX(pHead->msgType) <= 0) {
969
    tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
dengyihao's avatar
dengyihao 已提交
970 971
    terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE;
    return NULL;
H
hzcheng 已提交
972 973
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
974
  if (sid < 0 || sid >= pRpc->sessions) {
dengyihao's avatar
dengyihao 已提交
975 976 977 978
    tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, pRpc->sessions,
           TMSG_INFO(pHead->msgType));
    terrno = TSDB_CODE_RPC_INVALID_SESSION_ID;
    return NULL;
H
hzcheng 已提交
979 980
  }

981
  if (rpcIsReq(pHead->msgType) && htonl(pHead->msgVer) != tsVersion >> 8) {
dengyihao's avatar
dengyihao 已提交
982 983 984 985
    tDebug("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion,
           TMSG_INFO(pHead->msgType));
    terrno = TSDB_CODE_RPC_INVALID_VERSION;
    return NULL;
986 987
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
988
  pConn = rpcGetConnObj(pRpc, sid, pRecv);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
989
  if (pConn == NULL) {
dengyihao's avatar
dengyihao 已提交
990
    tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
991
    return NULL;
dengyihao's avatar
dengyihao 已提交
992
  }
H
hzcheng 已提交
993

994
  rpcLockConn(pConn);
H
hzcheng 已提交
995

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
996 997
  if (rpcIsReq(pHead->msgType)) {
    pConn->ahandle = (void *)pHead->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
998
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
999 1000 1001
  }

  sid = pConn->sid;
1002
  if (pConn->chandle == NULL) pConn->chandle = pRecv->chandle;
dengyihao's avatar
dengyihao 已提交
1003
  pConn->peerIp = pRecv->ip;
1004
  pConn->peerPort = pRecv->port;
dengyihao's avatar
dengyihao 已提交
1005
  if (pHead->port) pConn->peerPort = htons(pHead->port);
H
hzcheng 已提交
1006

1007
  terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1008 1009 1010 1011

  // code can be transformed only after authentication
  pHead->code = htonl(pHead->code);

1012
  if (terrno == 0) {
J
jtao1735 已提交
1013
    if (pHead->encrypt) {
1014 1015
      // decrypt here
    }
H
hzcheng 已提交
1016

dengyihao's avatar
dengyihao 已提交
1017
    if (rpcIsReq(pHead->msgType)) {
1018
      pConn->connType = pRecv->connType;
Y
yihaoDeng 已提交
1019
      terrno = rpcProcessReqHead(pConn, pHead);
1020

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1021
      // stop idle timer
dengyihao's avatar
dengyihao 已提交
1022
      taosTmrStopA(&pConn->pIdleTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1023

dengyihao's avatar
dengyihao 已提交
1024
      // client shall send the request within tsRpcTime again for UDP, double it
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1025
      if (pConn->connType != RPC_CONN_TCPS)
dengyihao's avatar
dengyihao 已提交
1026
        pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer * 2, pConn, pRpc->tmrCtrl);
1027 1028
    } else {
      terrno = rpcProcessRspHead(pConn, pHead);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1029
      *ppContext = pConn->pContext;
1030
    }
H
hzcheng 已提交
1031 1032
  }

1033
  rpcUnlockConn(pConn);
H
hzcheng 已提交
1034

1035
  return pConn;
H
hzcheng 已提交
1036 1037
}

Y
TD-3409  
yihaoDeng 已提交
1038
static void doRpcReportBrokenLinkToServer(void *param, void *id) {
dengyihao's avatar
dengyihao 已提交
1039 1040 1041
  SRpcMsg * pRpcMsg = (SRpcMsg *)(param);
  SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle);
  SRpcInfo *pRpc = pConn->pRpc;
S
Shengliang Guan 已提交
1042 1043
  (*(pRpc->cfp))(pRpc->parent, pRpcMsg, NULL);
  free(pRpcMsg);
Y
TD-3409  
yihaoDeng 已提交
1044
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1045 1046
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1047
  if (pConn->pReqMsg == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1048 1049

  // if there are pending request, notify the app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1050
  rpcAddRef(pRpc);
1051
  tDebug("%s, notify the server app, connection is gone", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1052

Y
TD-3409  
yihaoDeng 已提交
1053
  SRpcMsg *rpcMsg = malloc(sizeof(SRpcMsg));
dengyihao's avatar
dengyihao 已提交
1054 1055
  rpcMsg->pCont = pConn->pReqMsg;      // pReqMsg is re-used to store the APP context from server
  rpcMsg->contLen = pConn->reqMsgLen;  // reqMsgLen is re-used to store the APP context length
Y
TD-3409  
yihaoDeng 已提交
1056 1057 1058
  rpcMsg->ahandle = pConn->ahandle;
  rpcMsg->handle = pConn;
  rpcMsg->msgType = pConn->inType;
dengyihao's avatar
dengyihao 已提交
1059
  rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1060 1061
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
Y
TD-3409  
yihaoDeng 已提交
1062 1063 1064 1065 1066
  if (pRpc->cfp) {
    taosTmrStart(doRpcReportBrokenLinkToServer, 0, rpcMsg, pRpc->tmrCtrl);
  } else {
    free(rpcMsg);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1067 1068
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1069
static void rpcProcessBrokenLink(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1070
  if (pConn == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1071
  SRpcInfo *pRpc = pConn->pRpc;
1072
  tDebug("%s, link is broken", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1073 1074

  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1075

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1076 1077
  if (pConn->outType) {
    SRpcReqContext *pContext = pConn->pContext;
1078
    pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1079
    pContext->pConn = NULL;
1080
    pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1081 1082
    taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
  }
dengyihao's avatar
dengyihao 已提交
1083 1084

  if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1085

1086
  rpcReleaseConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1087
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1088 1089
}

1090
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
dengyihao's avatar
dengyihao 已提交
1091 1092 1093
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
  SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle;
  SRpcConn *pConn = (SRpcConn *)pRecv->thandle;
H
hzcheng 已提交
1094

S
Shengliang Guan 已提交
1095
  tDump(pRecv->msg, pRecv->msgLen);
1096 1097

  // underlying UDP layer does not know it is server or client
dengyihao's avatar
dengyihao 已提交
1098
  pRecv->connType = pRecv->connType | pRpc->connType;
H
hzcheng 已提交
1099

B
Bomin Zhang 已提交
1100
  if (pRecv->msg == NULL) {
1101
    rpcProcessBrokenLink(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1102 1103 1104
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1105
  terrno = 0;
S
slguan 已提交
1106 1107
  SRpcReqContext *pContext;
  pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
H
hzcheng 已提交
1108

1109 1110 1111
  char ipstr[24] = {0};
  taosIpPort2String(pRecv->ip, pRecv->port, ipstr);

1112
  if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) {
dengyihao's avatar
dengyihao 已提交
1113 1114 1115
    tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn,
           (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen, pHead->sourceId,
           pHead->destId, pHead->tranId, pHead->code);
1116
  } else {
dengyihao's avatar
dengyihao 已提交
1117 1118 1119
    tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn,
           (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId,
           pHead->tranId, pHead->code);
1120
  }
H
hzcheng 已提交
1121

H
TD-34  
hzcheng 已提交
1122
  int32_t code = terrno;
1123
  if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) {
dengyihao's avatar
dengyihao 已提交
1124
    if (code != 0) {  // parsing error
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1125
      if (rpcIsReq(pHead->msgType)) {
H
TD-34  
hzcheng 已提交
1126
        rpcSendErrorMsgToPeer(pRecv, code);
B
Bomin Zhang 已提交
1127 1128 1129
        if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE) {
          rpcCloseConn(pConn);
        }
1130
        if (TMSG_INDEX(pHead->msgType) + 1 > 1 && TMSG_INDEX(pHead->msgType) + 1 < TDMT_MAX) {
dengyihao's avatar
dengyihao 已提交
1131 1132
          tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
                 TMSG_INFO(pHead->msgType + 1), code);
Y
yihaoDeng 已提交
1133
        } else {
dengyihao's avatar
dengyihao 已提交
1134 1135
          tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
                 TMSG_INFO(pHead->msgType), code);
1136
        }
dengyihao's avatar
dengyihao 已提交
1137 1138
      }
    } else {  // msg is passed to app only parsing is ok
S
slguan 已提交
1139
      rpcProcessIncomingMsg(pConn, pHead, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1140
    }
H
hzcheng 已提交
1141 1142
  }

dengyihao's avatar
dengyihao 已提交
1143
  if (code) rpcFreeMsg(pRecv->msg);  // parsing failed, msg shall be freed
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1144 1145
  return pConn;
}
H
hzcheng 已提交
1146

1147
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
dengyihao's avatar
dengyihao 已提交
1148
  SRpcInfo *pRpc = pContext->pRpc;
1149

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1150
  pContext->pConn = NULL;
dengyihao's avatar
dengyihao 已提交
1151
  if (pContext->pRsp) {
1152
    // for synchronous API
S
Shengliang Guan 已提交
1153
    memcpy(pContext->pSet, &pContext->epSet, sizeof(SEpSet));
1154
    memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1155
    tsem_post(pContext->pSem);
1156
  } else {
dengyihao's avatar
dengyihao 已提交
1157
    // for asynchronous API
S
Shengliang Guan 已提交
1158
    SEpSet *pEpSet = NULL;
dengyihao's avatar
dengyihao 已提交
1159
    if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) pEpSet = &pContext->epSet;
1160

S
Shengliang Guan 已提交
1161
    (*pRpc->cfp)(pRpc->parent, pMsg, pEpSet);
1162 1163 1164
  }

  // free the request message
dengyihao's avatar
dengyihao 已提交
1165
  taosRemoveRef(tsRpcRefId, pContext->rid);
1166 1167
}

S
slguan 已提交
1168
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1169
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1170
  SRpcMsg   rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1171

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1172
  pHead = rpcDecompressRpcMsg(pHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1173 1174 1175
  rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
  rpcMsg.pCont = pHead->content;
  rpcMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
1176 1177 1178
  rpcMsg.code = pHead->code;

  if (rpcIsReq(pHead->msgType)) {
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1179
    rpcMsg.ahandle = pConn->ahandle;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1180 1181
    rpcMsg.handle = pConn;
    rpcAddRef(pRpc);  // add the refCount for requests
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1182

陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1183
    // notify the server app
S
Shengliang Guan 已提交
1184
    (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
H
hzcheng 已提交
1185
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1186
    // it's a response
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1187
    rpcMsg.handle = pContext;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1188
    rpcMsg.ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1189
    pContext->pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1190

1191
    // for UDP, port may be changed by server, the port in epSet shall be used for cache
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1192
    if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
H
Haojun Liao 已提交
1193
      rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.eps[pContext->epSet.inUse].port,
dengyihao's avatar
dengyihao 已提交
1194
                          pConn->connType);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1195
    } else {
1196 1197
      rpcCloseConn(pConn);
    }
H
hzcheng 已提交
1198

1199
    if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1200
      pContext->numOfTry = 0;
dengyihao's avatar
dengyihao 已提交
1201
      SEpSet *pEpSet = (SEpSet *)pHead->content;
1202 1203
      if (pEpSet->numOfEps > 0) {
        memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
S
TD-1670  
Shengliang Guan 已提交
1204 1205 1206
        tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,
               pContext->epSet.inUse);
        for (int i = 0; i < pContext->epSet.numOfEps; ++i) {
H
Haojun Liao 已提交
1207 1208 1209
          pContext->epSet.eps[i].port = htons(pContext->epSet.eps[i].port);
          tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.eps[i].fqdn,
                 pContext->epSet.eps[i].port);
S
TD-1670  
Shengliang Guan 已提交
1210
        }
1211
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1212
      rpcSendReqToServer(pRpc, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1213
      rpcFreeCont(rpcMsg.pCont);
dengyihao's avatar
dengyihao 已提交
1214 1215
    } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY ||
               pHead->code == TSDB_CODE_DND_OFFLINE) {
1216 1217
      pContext->code = pHead->code;
      rpcProcessConnError(pContext, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1218
      rpcFreeCont(rpcMsg.pCont);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1219
    } else {
1220
      rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1221 1222 1223 1224
    }
  }
}

1225
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
dengyihao's avatar
dengyihao 已提交
1226 1227
  char      msg[RPC_MSG_OVERHEAD];
  SRpcHead *pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1228

1229
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1230 1231 1232
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
dengyihao's avatar
dengyihao 已提交
1233
  pHead->msgType = pConn->inType + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1234
  pHead->spi = pConn->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1235 1236 1237 1238
  pHead->encrypt = 0;
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
1239
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1240
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1241
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1242
  pHead->code = htonl(code);
H
hzcheng 已提交
1243

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1244
  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
dengyihao's avatar
dengyihao 已提交
1245
  pConn->secured = 1;  // connection shall be secured
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1246 1247 1248
}

static void rpcSendReqHead(SRpcConn *pConn) {
dengyihao's avatar
dengyihao 已提交
1249 1250
  char      msg[RPC_MSG_OVERHEAD];
  SRpcHead *pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1251 1252 1253 1254 1255 1256

  // set msg header
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
  pHead->msgType = pConn->outType;
1257
  pHead->msgVer = htonl(tsVersion >> 8);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1258 1259 1260 1261 1262 1263
  pHead->spi = pConn->spi;
  pHead->encrypt = 0;
  pHead->tranId = pConn->outTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1264
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1265 1266 1267 1268
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
  pHead->code = 1;

  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1269
}
H
hjxilinx 已提交
1270

1271
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
dengyihao's avatar
dengyihao 已提交
1272 1273 1274 1275
  SRpcHead *pRecvHead, *pReplyHead;
  char      msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t)];
  uint32_t  timeStamp;
  int       msgLen;
H
hzcheng 已提交
1276

1277
  pRecvHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1278
  pReplyHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1279

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1280 1281
  memset(msg, 0, sizeof(SRpcHead));
  pReplyHead->version = pRecvHead->version;
1282
  pReplyHead->msgType = (tmsg_t)(pRecvHead->msgType + 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1283
  pReplyHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1284
  pReplyHead->encrypt = pRecvHead->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1285
  pReplyHead->tranId = pRecvHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1286
  pReplyHead->sourceId = pRecvHead->destId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1287
  pReplyHead->destId = pRecvHead->sourceId;
1288
  pReplyHead->linkUid = pRecvHead->linkUid;
1289
  pReplyHead->ahandle = pRecvHead->ahandle;
H
hzcheng 已提交
1290

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1291 1292
  pReplyHead->code = htonl(code);
  msgLen = sizeof(SRpcHead);
H
hzcheng 已提交
1293

1294
  if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP) {
1295
    // include a time stamp if client's time is not synchronized well
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1296
    uint8_t *pContent = pReplyHead->content;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1297
    timeStamp = htonl(taosGetTimestampSec());
1298 1299
    memcpy(pContent, &timeStamp, sizeof(timeStamp));
    msgLen += sizeof(timeStamp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1300
  }
H
hzcheng 已提交
1301

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1302
  pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1303
  (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle);
H
hzcheng 已提交
1304

dengyihao's avatar
dengyihao 已提交
1305
  return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1306 1307
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1308
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
dengyihao's avatar
dengyihao 已提交
1309 1310 1311 1312
  SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
  char *    msg = (char *)pHead;
  int       msgLen = rpcMsgLenFromCont(pContext->contLen);
  tmsg_t    msgType = pContext->msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1313

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1314
  pContext->numOfTry++;
1315
  SRpcConn *pConn = rpcSetupConnToServer(pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1316 1317
  if (pConn == NULL) {
    pContext->code = terrno;
1318
    taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1319 1320 1321
    return;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1322
  pContext->pConn = pConn;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1323
  pConn->ahandle = pContext->ahandle;
1324
  rpcLockConn(pConn);
1325

dengyihao's avatar
dengyihao 已提交
1326
  // set the message header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1327
  pHead->version = 1;
1328
  pHead->msgVer = htonl(tsVersion >> 8);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1329 1330
  pHead->msgType = msgType;
  pHead->encrypt = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1331
  pConn->tranId++;
dengyihao's avatar
dengyihao 已提交
1332
  if (pConn->tranId == 0) pConn->tranId++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1333 1334 1335 1336
  pHead->tranId = pConn->tranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->port = 0;
1337
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1338
  pHead->ahandle = (uint64_t)pConn->ahandle;
1339
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1340 1341 1342

  // set the connection parameters
  pConn->outType = msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1343
  pConn->outTranId = pHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1344 1345
  pConn->pReqMsg = msg;
  pConn->reqMsgLen = msgLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1346
  pConn->pContext = pContext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1347

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1348
  rpcSendMsgToPeer(pConn, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1349 1350
  if (pConn->connType != RPC_CONN_TCPC)
    taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1351 1352

  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1353
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1354 1355

static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
dengyihao's avatar
dengyihao 已提交
1356 1357
  int       writtenLen = 0;
  SRpcHead *pHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1358

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1359
  msgLen = rpcAddAuthPart(pConn, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1360

dengyihao's avatar
dengyihao 已提交
1361 1362 1363
  if (rpcIsReq(pHead->msgType)) {
    tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType),
           pConn->peerFqdn, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1364
  } else {
H
Haojun Liao 已提交
1365 1366 1367 1368 1369 1370 1371 1372
    if (pHead->code == 0) {
      pConn->secured = 1;  // for success response, set link as secured
    }

    char ipport[40] = {0};
    taosIpPort2String(pConn->peerIp, pConn->peerPort, ipport);
    tDebug("%s, %s is sent to %s, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType),
           ipport, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1373
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1374

dengyihao's avatar
dengyihao 已提交
1375
  // tTrace("connection type is: %d", pConn->connType);
1376
  writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1377

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1378
  if (writtenLen != msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1379
    tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
1380
  }
dengyihao's avatar
dengyihao 已提交
1381

S
Shengliang Guan 已提交
1382
  tDump(msg, msgLen);
H
hzcheng 已提交
1383 1384
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1385
static void rpcProcessConnError(void *param, void *id) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1386
  SRpcReqContext *pContext = (SRpcReqContext *)param;
dengyihao's avatar
dengyihao 已提交
1387
  SRpcInfo *      pRpc = pContext->pRpc;
S
Shengliang Guan 已提交
1388
  SRpcMsg         rpcMsg = {0};
dengyihao's avatar
dengyihao 已提交
1389

H
hjxilinx 已提交
1390 1391 1392
  if (pRpc == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1393

1394
  tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);
H
hzcheng 已提交
1395

H
Hongze Cheng 已提交
1396
  if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TDMT_VND_FETCH) {
dengyihao's avatar
dengyihao 已提交
1397
    rpcMsg.msgType = pContext->msgType + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1398
    rpcMsg.ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1399 1400 1401
    rpcMsg.code = pContext->code;
    rpcMsg.pCont = NULL;
    rpcMsg.contLen = 0;
1402 1403

    rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1404
  } else {
dengyihao's avatar
dengyihao 已提交
1405
    // move to next IP
1406 1407
    pContext->epSet.inUse++;
    pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1408
    rpcSendReqToServer(pRpc, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1409
  }
H
hzcheng 已提交
1410 1411
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1412 1413 1414
static void rpcProcessRetryTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;
H
hzcheng 已提交
1415

1416
  rpcLockConn(pConn);
H
hzcheng 已提交
1417

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1418
  if (pConn->outType && pConn->user[0]) {
H
Hongze Cheng 已提交
1419
    tDebug("%s, expected %s is not received", pConn->info, TMSG_INFO((int)pConn->outType + 1));
H
hzcheng 已提交
1420 1421 1422
    pConn->pTimer = NULL;
    pConn->retry++;

S
slguan 已提交
1423
    if (pConn->retry < 4) {
H
Hongze Cheng 已提交
1424
      tDebug("%s, re-send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn, pConn->peerPort);
dengyihao's avatar
dengyihao 已提交
1425
      rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
1426
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
H
hzcheng 已提交
1427 1428
    } else {
      // close the connection
dengyihao's avatar
dengyihao 已提交
1429 1430
      tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn,
             pConn->peerPort);
1431 1432
      if (pConn->pContext) {
        pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1433
        pConn->pContext->pConn = NULL;
1434
        pConn->pReqMsg = NULL;
1435
        taosTmrStart(rpcProcessConnError, 1, pConn->pContext, pRpc->tmrCtrl);
1436 1437
        rpcReleaseConn(pConn);
      }
H
hzcheng 已提交
1438
    }
1439
  } else {
1440
    tDebug("%s, retry timer not processed", pConn->info);
H
hzcheng 已提交
1441 1442
  }

1443
  rpcUnlockConn(pConn);
H
hzcheng 已提交
1444 1445
}

1446 1447 1448
static void rpcProcessIdleTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;

1449 1450
  rpcLockConn(pConn);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1451
  if (pConn->user[0]) {
1452
    tDebug("%s, close the connection since no activity", pConn->info);
dengyihao's avatar
dengyihao 已提交
1453
    if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
1454
    rpcReleaseConn(pConn);
1455
  } else {
1456
    tDebug("%s, idle timer:%p not processed", pConn->info, tmrId);
1457
  }
1458 1459

  rpcUnlockConn(pConn);
1460 1461 1462 1463 1464 1465
}

static void rpcProcessProgressTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;

1466
  rpcLockConn(pConn);
1467

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1468
  if (pConn->inType && pConn->user[0]) {
1469
    tDebug("%s, progress timer expired, send progress", pConn->info);
1470
    rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
1471
    pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
1472
  } else {
1473
    tDebug("%s, progress timer:%p not processed", pConn->info, tmrId);
1474 1475
  }

1476
  rpcUnlockConn(pConn);
1477 1478
}

dengyihao's avatar
dengyihao 已提交
1479 1480 1481 1482 1483
static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen) {
  SRpcHead *pHead = rpcHeadFromCont(pCont);
  int32_t   finalLen = 0;
  int       overhead = sizeof(SRpcComp);

1484 1485 1486
  if (!NEEDTO_COMPRESSS_MSG(contLen)) {
    return contLen;
  }
dengyihao's avatar
dengyihao 已提交
1487 1488

  char *buf = malloc(contLen + overhead + 8);  // 8 extra bytes
1489
  if (buf == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1490
    tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
1491 1492
    return contLen;
  }
dengyihao's avatar
dengyihao 已提交
1493

1494
  int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
S
Shuduo Sang 已提交
1495
  tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead);
dengyihao's avatar
dengyihao 已提交
1496

1497 1498 1499 1500
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
S
TD-4100  
Shengliang Guan 已提交
1501
  if (compLen > 0 && compLen < contLen - overhead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1502
    SRpcComp *pComp = (SRpcComp *)pCont;
dengyihao's avatar
dengyihao 已提交
1503 1504
    pComp->reserved = 0;
    pComp->contLen = htonl(contLen);
1505
    memcpy(pCont + overhead, buf, compLen);
dengyihao's avatar
dengyihao 已提交
1506

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1507
    pHead->comp = 1;
S
Shuduo Sang 已提交
1508
    tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen);
1509 1510 1511 1512 1513 1514 1515 1516 1517
    finalLen = compLen + overhead;
  } else {
    finalLen = contLen;
  }

  free(buf);
  return finalLen;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1518
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
dengyihao's avatar
dengyihao 已提交
1519 1520 1521 1522
  int       overhead = sizeof(SRpcComp);
  SRpcHead *pNewHead = NULL;
  uint8_t * pCont = pHead->content;
  SRpcComp *pComp = (SRpcComp *)pHead->content;
1523

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1524
  if (pHead->comp) {
1525
    // decompress the content
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1526 1527
    assert(pComp->reserved == 0);
    int contLen = htonl(pComp->contLen);
dengyihao's avatar
dengyihao 已提交
1528

1529
    // prepare the temporary buffer to decompress message
1530
    char *temp = (char *)malloc(contLen + RPC_MSG_OVERHEAD);
dengyihao's avatar
dengyihao 已提交
1531 1532
    pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext));  // reserve SRpcReqContext

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1533
    if (pNewHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1534
      int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
dengyihao's avatar
dengyihao 已提交
1535
      int origLen = LZ4_decompress_safe((char *)(pCont + overhead), (char *)pNewHead->content, compLen, contLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1536
      assert(origLen == contLen);
dengyihao's avatar
dengyihao 已提交
1537

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1538
      memcpy(pNewHead, pHead, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1539
      pNewHead->msgLen = rpcMsgLenFromCont(origLen);
dengyihao's avatar
dengyihao 已提交
1540 1541
      rpcFreeMsg(pHead);  // free the compressed message buffer
      pHead = pNewHead;
S
TD-1762  
Shengliang Guan 已提交
1542
      tTrace("decomp malloc mem:%p", temp);
1543
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1544
      tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
1545 1546 1547
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1548
  return pHead;
1549 1550
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1551
static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) {
dengyihao's avatar
dengyihao 已提交
1552
  T_MD5_CTX context;
dengyihao's avatar
dengyihao 已提交
1553
  int       ret = -1;
H
hzcheng 已提交
1554

dengyihao's avatar
dengyihao 已提交
1555
  tMD5Init(&context);
H
Haojun Liao 已提交
1556
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1557
  tMD5Update(&context, (uint8_t *)pMsg, msgLen);
H
Haojun Liao 已提交
1558
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1559
  tMD5Final(&context);
H
hzcheng 已提交
1560 1561 1562 1563 1564 1565

  if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;

  return ret;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1566
static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) {
dengyihao's avatar
dengyihao 已提交
1567
  T_MD5_CTX context;
H
hzcheng 已提交
1568

dengyihao's avatar
dengyihao 已提交
1569
  tMD5Init(&context);
H
Haojun Liao 已提交
1570
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1571
  tMD5Update(&context, (uint8_t *)pMsg, msgLen);
H
Haojun Liao 已提交
1572
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1573
  tMD5Final(&context);
H
hzcheng 已提交
1574 1575 1576

  memcpy(pAuth, context.digest, sizeof(context.digest));
}
1577 1578

static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1579
  SRpcHead *pHead = (SRpcHead *)msg;
1580

1581
  if (pConn->spi && pConn->secured == 0) {
1582
    // add auth part
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1583
    pHead->spi = pConn->spi;
1584 1585 1586
    SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1587
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1588
    rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
1589
  } else {
1590
    pHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1591
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1592 1593 1594 1595 1596 1597
  }

  return msgLen;
}

static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1598
  SRpcHead *pHead = (SRpcHead *)msg;
1599
  int       code = 0;
1600

dengyihao's avatar
dengyihao 已提交
1601 1602
  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
    // secured link, or no authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1603
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1604
    // tTrace("%s, secured link, no auth is required", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1605 1606 1607
    return 0;
  }

dengyihao's avatar
dengyihao 已提交
1608
  if (!rpcIsReq(pHead->msgType)) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1609 1610
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
1611
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
dengyihao's avatar
dengyihao 已提交
1612 1613
        code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
        code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
1614
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1615
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1616
      return 0;
1617
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1618
  }
dengyihao's avatar
dengyihao 已提交
1619

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1620
  code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1621
  if (pHead->spi == pConn->spi) {
1622
    // authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1623
    SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest));
1624 1625 1626 1627 1628

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1629
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
1630
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
1631
    } else {
dengyihao's avatar
dengyihao 已提交
1632
      if (rpcAuthenticateMsg(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
1633
        tDebug("%s, authentication failed, msg discarded", pConn->info);
1634
        code = TSDB_CODE_RPC_AUTH_FAILURE;
1635
      } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1636
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
dengyihao's avatar
dengyihao 已提交
1637
        if (!rpcIsReq(pHead->msgType)) pConn->secured = 1;  // link is secured for client
1638
        // tTrace("%s, message is authenticated", pConn->info);
1639 1640 1641
      }
    }
  } else {
S
Shengliang 已提交
1642
    tError("%s, auth spi:%d not matched with received:%d %p", pConn->info, pConn->spi, pHead->spi, pConn);
1643
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
1644 1645 1646 1647 1648
  }

  return code;
}

1649
static void rpcLockConn(SRpcConn *pConn) {
S
TD-2616  
Shengliang Guan 已提交
1650
  int64_t tid = taosGetSelfPthreadId();
1651
  int     i = 0;
1652 1653 1654 1655 1656 1657 1658 1659
  while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) {
    if (++i % 1000 == 0) {
      sched_yield();
    }
  }
}

static void rpcUnlockConn(SRpcConn *pConn) {
S
TD-2616  
Shengliang Guan 已提交
1660
  int64_t tid = taosGetSelfPthreadId();
1661 1662 1663 1664
  if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) {
    assert(false);
  }
}
1665

dengyihao's avatar
dengyihao 已提交
1666
static void rpcAddRef(SRpcInfo *pRpc) { atomic_add_fetch_32(&pRpc->refCount, 1); }
1667

dengyihao's avatar
dengyihao 已提交
1668
static void rpcDecRef(SRpcInfo *pRpc) {
1669
  if (atomic_sub_fetch_32(&pRpc->refCount, 1) == 0) {
1670
    rpcCloseConnCache(pRpc->pCache);
dengyihao's avatar
dengyihao 已提交
1671
    taosHashCleanup(pRpc->hash);
dengyihao's avatar
dengyihao 已提交
1672
    taosTmrCleanUp(pRpc->tmrCtrl);
dengyihao's avatar
dengyihao 已提交
1673
    taosIdPoolCleanUp(pRpc->idPool);
1674

S
TD-1848  
Shengliang Guan 已提交
1675
    tfree(pRpc->connList);
1676
    pthread_mutex_destroy(&pRpc->mutex);
1677
    tDebug("%s rpc resources are released", pRpc->label);
S
TD-1848  
Shengliang Guan 已提交
1678
    tfree(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1679

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1680
    atomic_sub_fetch_32(&tsRpcNum, 1);
1681 1682
  }
}
dengyihao's avatar
dengyihao 已提交
1683
#endif