rpcMain.c 55.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "lz4.h"
S
slguan 已提交
17
#include "os.h"
dengyihao's avatar
dengyihao 已提交
18 19 20 21 22 23 24 25
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
H
hzcheng 已提交
26 27 28
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
H
Hongze Cheng 已提交
29
#include "tmsg.h"
dengyihao's avatar
dengyihao 已提交
30
#include "transportInt.h"
dengyihao's avatar
dengyihao 已提交
31
#include "tref.h"
S
slguan 已提交
32
#include "trpc.h"
dengyihao's avatar
dengyihao 已提交
33 34 35
#include "ttimer.h"
#include "tutil.h"

dengyihao's avatar
dengyihao 已提交
36 37 38 39 40 41 42 43 44
static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT;

int tsRpcMaxUdpSize = 15000;  // bytes
int tsProgressTimer = 100;
// not configurable
int tsRpcMaxRetry;
int tsRpcHeadSize;
int tsRpcOverhead;

S
Shengliang 已提交
45 46
SHashObj *tsFqdnHash;

dengyihao's avatar
dengyihao 已提交
47 48
#ifndef USE_UV

dengyihao's avatar
dengyihao 已提交
49 50 51 52 53 54 55 56
typedef struct {
  int      sessions;      // number of sessions allowed
  int      numOfThreads;  // number of threads to process incoming messages
  int      idleTime;      // milliseconds;
  uint16_t localPort;
  int8_t   connType;
  int      index;  // for UDP server only, round robin for multiple threads
  char     label[TSDB_LABEL_LEN];
dengyihao's avatar
dengyihao 已提交
57

dengyihao's avatar
dengyihao 已提交
58 59 60 61 62
  char user[TSDB_UNI_LEN];         // meter ID
  char spi;                        // security parameter index
  char encrypt;                    // encrypt algorithm
  char secret[TSDB_PASSWORD_LEN];  // secret for the link
  char ckey[TSDB_PASSWORD_LEN];    // ciphering key
dengyihao's avatar
dengyihao 已提交
63

dengyihao's avatar
dengyihao 已提交
64 65
  void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
  int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
dengyihao's avatar
dengyihao 已提交
66

dengyihao's avatar
dengyihao 已提交
67
  int32_t          refCount;
dengyihao's avatar
dengyihao 已提交
68 69 70 71 72 73 74
  void *           parent;
  void *           idPool;     // handle to ID pool
  void *           tmrCtrl;    // handle to timer
  SHashObj *       hash;       // handle returned by hash utility
  void *           tcphandle;  // returned handle from TCP initialization
  void *           udphandle;  // returned handle from UDP initialization
  void *           pCache;     // connection cache
dengyihao's avatar
dengyihao 已提交
75
  pthread_mutex_t  mutex;
dengyihao's avatar
dengyihao 已提交
76
  struct SRpcConn *connList;  // connection list
dengyihao's avatar
dengyihao 已提交
77
} SRpcInfo;
dengyihao's avatar
dengyihao 已提交
78

dengyihao's avatar
dengyihao 已提交
79
typedef struct {
dengyihao's avatar
dengyihao 已提交
80
  SRpcInfo *       pRpc;      // associated SRpcInfo
dengyihao's avatar
dengyihao 已提交
81
  SEpSet           epSet;     // ip list provided by app
dengyihao's avatar
dengyihao 已提交
82 83
  void *           ahandle;   // handle provided by app
  struct SRpcConn *pConn;     // pConn allocated
dengyihao's avatar
dengyihao 已提交
84
  tmsg_t           msgType;   // message type
dengyihao's avatar
dengyihao 已提交
85
  uint8_t *        pCont;     // content provided by app
dengyihao's avatar
dengyihao 已提交
86 87 88 89 90 91 92
  int32_t          contLen;   // content length
  int32_t          code;      // error code
  int16_t          numOfTry;  // number of try for different servers
  int8_t           oldInUse;  // server EP inUse passed by app
  int8_t           redirect;  // flag to indicate redirect
  int8_t           connType;  // connection type
  int64_t          rid;       // refId returned by taosAddRef
dengyihao's avatar
dengyihao 已提交
93 94 95
  SRpcMsg *        pRsp;      // for synchronous API
  tsem_t *         pSem;      // for synchronous API
  SEpSet *         pSet;      // for synchronous API
dengyihao's avatar
dengyihao 已提交
96 97 98
  char             msg[0];    // RpcHead starts from here
} SRpcReqContext;

dengyihao's avatar
dengyihao 已提交
99
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
dengyihao's avatar
dengyihao 已提交
100
#define rpcHeadFromCont(cont) ((SRpcHead *)((char *)cont - sizeof(SRpcHead)))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101 102 103
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104
#define rpcIsReq(type) (type & 1U)
H
hzcheng 已提交
105

J
Jeff Tao 已提交
106
typedef struct SRpcConn {
dengyihao's avatar
dengyihao 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
  char            info[48];                   // debug info: label + pConn + ahandle
  int             sid;                        // session ID
  uint32_t        ownId;                      // own link ID
  uint32_t        peerId;                     // peer link ID
  char            user[TSDB_UNI_LEN];         // user ID for the link
  char            spi;                        // security parameter index
  char            encrypt;                    // encryption, 0:1
  char            secret[TSDB_PASSWORD_LEN];  // secret for the link
  char            ckey[TSDB_PASSWORD_LEN];    // ciphering key
  char            secured;                    // if set to 1, no authentication
  uint16_t        localPort;                  // for UDP only
  uint32_t        linkUid;                    // connection unique ID assigned by client
  uint32_t        peerIp;                     // peer IP
  uint16_t        peerPort;                   // peer port
  char            peerFqdn[TSDB_FQDN_LEN];    // peer FQDN or ip string
  uint16_t        tranId;                     // outgoing transcation ID, for build message
  uint16_t        outTranId;                  // outgoing transcation ID
  uint16_t        inTranId;                   // transcation ID for incoming msg
  tmsg_t          outType;                    // message type for outgoing request
  tmsg_t          inType;                     // message type for incoming request
  void *          chandle;                    // handle passed by TCP/UDP connection layer
  void *          ahandle;                    // handle provided by upper app layter
  int             retry;                      // number of retry for sending request
  int             tretry;                     // total retry
  void *          pTimer;                     // retry timer to monitor the response
  void *          pIdleTimer;                 // idle timer
  char *          pRspMsg;                    // response message including header
  int             rspMsgLen;                  // response messag length
  char *          pReqMsg;                    // request message including header
  int             reqMsgLen;                  // request message length
  SRpcInfo *      pRpc;                       // the associated SRpcInfo
  int8_t          connType;                   // connection type
  int64_t         lockedBy;                   // lock for connection
  SRpcReqContext *pContext;                   // request context
H
hzcheng 已提交
141 142
} SRpcConn;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
143 144
static int     tsRpcRefId = -1;
static int32_t tsRpcNum = 0;
dengyihao's avatar
dengyihao 已提交
145
// static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
146

147
// server:0 client:1  tcp:2 udp:0
dengyihao's avatar
dengyihao 已提交
148 149 150 151
#define RPC_CONN_UDPS 0
#define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3
152

J
jtao1735 已提交
153
void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
dengyihao's avatar
dengyihao 已提交
154
    taosInitUdpConnection, taosInitUdpConnection, taosInitTcpServer, taosInitTcpClient};
H
hzcheng 已提交
155

dengyihao's avatar
dengyihao 已提交
156 157
void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer,
                                            taosCleanUpTcpClient};
H
hzcheng 已提交
158

159
void (*taosStopConn[])(void *thandle) = {
dengyihao's avatar
dengyihao 已提交
160 161
    taosStopUdpConnection,
    taosStopUdpConnection,
162
    taosStopTcpServer,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
163
    taosStopTcpClient,
164 165
};

166
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
dengyihao's avatar
dengyihao 已提交
167
    taosSendUdpData, taosSendUdpData, taosSendTcpData, taosSendTcpData};
H
hzcheng 已提交
168

J
jtao1735 已提交
169
void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = {
S
slguan 已提交
170 171 172 173
    taosOpenUdpConnection,
    taosOpenUdpConnection,
    NULL,
    taosOpenTcpClientConnection,
H
hzcheng 已提交
174 175
};

dengyihao's avatar
dengyihao 已提交
176
void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpConnection, taosCloseTcpConnection};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177

J
jtao1735 已提交
178
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179
static void      rpcCloseConn(void *thandle);
180
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext);
181
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
182 183
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
H
hzcheng 已提交
184

dengyihao's avatar
dengyihao 已提交
185 186 187 188 189
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void rpcSendReqHead(SRpcConn *pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190

191
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
S
slguan 已提交
192
static void  rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext);
193 194 195
static void  rpcProcessConnError(void *param, void *id);
static void  rpcProcessRetryTimer(void *, void *);
static void  rpcProcessIdleTimer(void *param, void *tmrId);
196
static void  rpcProcessProgressTimer(void *param, void *tmrId);
H
hzcheng 已提交
197

dengyihao's avatar
dengyihao 已提交
198 199
static void      rpcFreeMsg(void *msg);
static int32_t   rpcCompressRpcMsg(char *pCont, int32_t contLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
200
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
dengyihao's avatar
dengyihao 已提交
201 202 203 204 205 206
static int       rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
static int       rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
static void      rpcLockConn(SRpcConn *pConn);
static void      rpcUnlockConn(SRpcConn *pConn);
static void      rpcAddRef(SRpcInfo *pRpc);
static void      rpcDecRef(SRpcInfo *pRpc);
H
hzcheng 已提交
207

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
208 209 210 211 212
static void rpcFree(void *p) {
  tTrace("free mem: %p", p);
  free(p);
}

213 214 215 216
static void rpcInitImp(void) {
  tsProgressTimer = tsRpcTimer / 2;
  tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsProgressTimer;
  tsRpcHeadSize = RPC_MSG_OVERHEAD;
217
  tsRpcOverhead = sizeof(SRpcReqContext);
H
hzcheng 已提交
218

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
219
  tsRpcRefId = taosOpenRef(200, rpcFree);
S
Shengliang 已提交
220 221

  tsFqdnHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222
}
223

224 225
int32_t rpcInit(void) {
  pthread_once(&tsRpcInitOnce, rpcInitImp);
226
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
227
}
228

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229 230
void rpcCleanup(void) {
  taosCloseRef(tsRpcRefId);
S
Shengliang 已提交
231 232 233
  taosHashClear(tsFqdnHash);
  taosHashCleanup(tsFqdnHash);
  tsFqdnHash = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
234 235
  tsRpcRefId = -1;
}
dengyihao's avatar
dengyihao 已提交
236

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237 238 239
void *rpcOpen(const SRpcInit *pInit) {
  SRpcInfo *pRpc;

dengyihao's avatar
dengyihao 已提交
240
  // pthread_once(&tsRpcInit, rpcInit);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
241

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
242 243
  pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
  if (pRpc == NULL) return NULL;
H
hzcheng 已提交
244

H
Haojun Liao 已提交
245
  if (pInit->label) tstrncpy(pRpc->label, pInit->label, tListLen(pInit->label));
dengyihao's avatar
dengyihao 已提交
246

247
  pRpc->connType = pInit->connType;
Y
TD-3115  
yihaoDeng 已提交
248 249
  if (pRpc->connType == TAOS_CONN_CLIENT) {
    pRpc->numOfThreads = pInit->numOfThreads;
dengyihao's avatar
dengyihao 已提交
250 251 252
    if (pRpc->numOfThreads >= 10) {
      pRpc->numOfThreads = 10;
    }
Y
TD-3115  
yihaoDeng 已提交
253
  } else {
dengyihao's avatar
dengyihao 已提交
254
    pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
Y
TD-3115  
yihaoDeng 已提交
255
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
256 257 258
  pRpc->idleTime = pInit->idleTime;
  pRpc->localPort = pInit->localPort;
  pRpc->afp = pInit->afp;
dengyihao's avatar
dengyihao 已提交
259
  pRpc->sessions = pInit->sessions + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
260 261 262
  if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
  if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret));
  if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
263
  pRpc->spi = pInit->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
264
  pRpc->cfp = pInit->cfp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
265
  pRpc->afp = pInit->afp;
S
Shengliang Guan 已提交
266
  pRpc->parent = pInit->parent;
267
  pRpc->refCount = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
268

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
269 270
  atomic_add_fetch_32(&tsRpcNum, 1);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271
  size_t size = sizeof(SRpcConn) * pRpc->sessions;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
272 273
  pRpc->connList = (SRpcConn *)calloc(1, size);
  if (pRpc->connList == NULL) {
S
TD-1530  
Shengliang Guan 已提交
274
    tError("%s failed to allocate memory for taos connections, size:%" PRId64, pRpc->label, (int64_t)size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
275
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276 277
    return NULL;
  }
H
hzcheng 已提交
278

dengyihao's avatar
dengyihao 已提交
279
  pRpc->idPool = taosInitIdPool(pRpc->sessions - 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
280 281
  if (pRpc->idPool == NULL) {
    tError("%s failed to init ID pool", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
282
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283
    return NULL;
H
hzcheng 已提交
284
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
285

dengyihao's avatar
dengyihao 已提交
286
  pRpc->tmrCtrl = taosTmrInit(pRpc->sessions * 2 + 1, 50, 10000, pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
287 288
  if (pRpc->tmrCtrl == NULL) {
    tError("%s failed to init timers", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
290 291
    return NULL;
  }
H
hzcheng 已提交
292

293
  if (pRpc->connType == TAOS_CONN_SERVER) {
H
Haojun Liao 已提交
294
    pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true);
295 296 297 298 299 300
    if (pRpc->hash == NULL) {
      tError("%s failed to init string hash", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
  } else {
dengyihao's avatar
dengyihao 已提交
301 302
    pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20);
    if (pRpc->pCache == NULL) {
303 304 305 306
      tError("%s failed to init connection cache", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
307
  }
H
hzcheng 已提交
308

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
309 310
  pthread_mutex_init(&pRpc->mutex, NULL);

dengyihao's avatar
dengyihao 已提交
311 312 313 314
  pRpc->tcphandle = (*taosInitConn[pRpc->connType | RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads,
                                                                   rpcProcessMsgFromPeer, pRpc);
  pRpc->udphandle =
      (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
315 316 317 318 319 320 321

  if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) {
    tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort);
    rpcClose(pRpc);
    return NULL;
  }

S
TD-1843  
Shengliang Guan 已提交
322
  tDebug("%s rpc is opened, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions);
H
hzcheng 已提交
323

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
324
  return pRpc;
H
hzcheng 已提交
325 326
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
327 328
void rpcClose(void *param) {
  SRpcInfo *pRpc = (SRpcInfo *)param;
H
hzcheng 已提交
329

330
  // stop connection to outside first
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
331 332
  (*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
  (*taosStopConn[pRpc->connType])(pRpc->udphandle);
333

dengyihao's avatar
dengyihao 已提交
334
  // close all connections
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
335
  for (int i = 0; i < pRpc->sessions; ++i) {
336
    if (pRpc->connList && pRpc->connList[i].user[0]) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
337
      rpcCloseConn((void *)(pRpc->connList + i));
H
hzcheng 已提交
338 339 340
    }
  }

341
  // clean up
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
342 343
  (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
  (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
J
Jeff Tao 已提交
344

345
  tDebug("%s rpc is closed", pRpc->label);
346
  rpcDecRef(pRpc);
H
hzcheng 已提交
347 348
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
349 350
void *rpcMallocCont(int contLen) {
  int size = contLen + RPC_MSG_OVERHEAD;
351

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
352 353
  char *start = (char *)calloc(1, (size_t)size);
  if (start == NULL) {
354 355
    tError("failed to malloc msg, size:%d", size);
    return NULL;
356
  } else {
S
TD-1762  
Shengliang Guan 已提交
357
    tTrace("malloc mem:%p size:%d", start, size);
358 359
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
360
  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
361 362 363
}

void rpcFreeCont(void *cont) {
S
Shengliang Guan 已提交
364
  if (cont) {
365 366
    char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext);
    free(temp);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
367
    tTrace("free mem: %p", temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
368
  }
369 370
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
371 372 373 374
void *rpcReallocCont(void *ptr, int contLen) {
  if (ptr == NULL) return rpcMallocCont(contLen);

  char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead);
dengyihao's avatar
dengyihao 已提交
375 376
  if (contLen == 0) {
    free(start);
J
Jeff Tao 已提交
377
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
378 379 380 381 382 383 384
  }

  int size = contLen + RPC_MSG_OVERHEAD;
  start = realloc(start, size);
  if (start == NULL) {
    tError("failed to realloc cont, size:%d", size);
    return NULL;
dengyihao's avatar
dengyihao 已提交
385
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
386 387 388 389

  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}

S
Shengliang Guan 已提交
390
void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
dengyihao's avatar
dengyihao 已提交
391
  SRpcInfo *      pRpc = (SRpcInfo *)shandle;
392 393
  SRpcReqContext *pContext;

J
Jeff Tao 已提交
394
  int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
395
  pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
396
  pContext->ahandle = pMsg->ahandle;
397
  pContext->pRpc = (SRpcInfo *)shandle;
398
  pContext->epSet = *pEpSet;
J
Jeff Tao 已提交
399
  pContext->contLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
400 401
  pContext->pCont = pMsg->pCont;
  pContext->msgType = pMsg->msgType;
402
  pContext->oldInUse = pEpSet->inUse;
403

dengyihao's avatar
dengyihao 已提交
404 405
  pContext->connType = RPC_CONN_UDPC;
  if (contLen > tsRpcMaxUdpSize || tsRpcForceTcp) pContext->connType = RPC_CONN_TCPC;
406

dengyihao's avatar
dengyihao 已提交
407
  // connection type is application specific.
408
  // for TDengine, all the query, show commands shall have TCP connection
409
  tmsg_t type = pMsg->msgType;
dengyihao's avatar
dengyihao 已提交
410 411 412
  if (type == TDMT_VND_QUERY || type == TDMT_MND_SHOW_RETRIEVE || type == TDMT_VND_FETCH ||
      type == TDMT_MND_VGROUP_LIST || type == TDMT_VND_TABLES_META || type == TDMT_VND_TABLE_META ||
      type == TDMT_MND_SHOW || type == TDMT_MND_STATUS || type == TDMT_VND_ALTER_TABLE)
413
    pContext->connType = RPC_CONN_TCPC;
张宏权 已提交
414

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
415
  pContext->rid = taosAddRef(tsRpcRefId, pContext);
416
  if (pRid) *pRid = pContext->rid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
417

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
418
  rpcSendReqToServer(pRpc, pContext);
419 420
}

J
Jeff Tao 已提交
421
void rpcSendResponse(const SRpcMsg *pRsp) {
422 423
  if (pRsp->handle == NULL) return;

dengyihao's avatar
dengyihao 已提交
424 425 426 427 428
  int       msgLen = 0;
  SRpcConn *pConn = (SRpcConn *)pRsp->handle;
  SRpcMsg   rpcMsg = *pRsp;
  SRpcMsg * pMsg = &rpcMsg;
  SRpcInfo *pRpc = pConn->pRpc;
J
Jeff Tao 已提交
429

dengyihao's avatar
dengyihao 已提交
430
  if (pMsg->pCont == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
431 432
    pMsg->pCont = rpcMallocCont(0);
    pMsg->contLen = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
433 434
  }

dengyihao's avatar
dengyihao 已提交
435 436
  SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont);
  char *    msg = (char *)pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
437

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
438 439
  pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
  msgLen = rpcMsgLenFromCont(pMsg->contLen);
440

441
  rpcLockConn(pConn);
442

dengyihao's avatar
dengyihao 已提交
443
  if (pConn->inType == 0 || pConn->user[0] == 0) {
H
Haojun Liao 已提交
444
    tError("%s, connection is already released, rsp wont be sent", pConn->info);
445
    rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
446 447
    rpcFreeCont(pMsg->pCont);
    rpcDecRef(pRpc);
448 449 450
    return;
  }

451
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
452
  pHead->version = 1;
dengyihao's avatar
dengyihao 已提交
453
  pHead->msgType = pConn->inType + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
454 455
  pHead->spi = pConn->spi;
  pHead->encrypt = pConn->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
456 457 458
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
459
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
460
  pHead->port = htons(pConn->localPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
461
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
462 463
  pHead->ahandle = (uint64_t)pConn->ahandle;

464 465
  // set pConn parameters
  pConn->inType = 0;
466 467

  // response message is released until new response is sent
dengyihao's avatar
dengyihao 已提交
468
  rpcFreeMsg(pConn->pRspMsg);
469 470
  pConn->pRspMsg = msg;
  pConn->rspMsgLen = msgLen;
471
  if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--;
472

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
473
  // stop the progress timer
474
  taosTmrStopA(&pConn->pTimer);
475 476

  // set the idle timer to monitor the activity
Y
yihaoDeng 已提交
477
  taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime * 30, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
478
  rpcSendMsgToPeer(pConn, msg, msgLen);
479 480

  // if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured
dengyihao's avatar
dengyihao 已提交
481
  if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY) pConn->secured = 1;  // connection shall be secured
482 483

  if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
484 485
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
486

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
487
  rpcUnlockConn(pConn);
dengyihao's avatar
dengyihao 已提交
488
  rpcDecRef(pRpc);  // decrease the referene count
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
489

490 491 492
  return;
}

S
Shengliang Guan 已提交
493
void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
494
  SRpcMsg rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
495
  memset(&rpcMsg, 0, sizeof(rpcMsg));
dengyihao's avatar
dengyihao 已提交
496

S
Shengliang Guan 已提交
497
  rpcMsg.contLen = sizeof(SEpSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
498 499
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  if (rpcMsg.pCont == NULL) return;
500

S
Shengliang Guan 已提交
501
  memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet));
502

503
  rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
504
  rpcMsg.handle = thandle;
505

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
506
  rpcSendResponse(&rpcMsg);
507 508 509 510

  return;
}

511
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
dengyihao's avatar
dengyihao 已提交
512
  SRpcConn *pConn = (SRpcConn *)thandle;
513
  if (pConn->user[0] == 0) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
514

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
515 516
  pInfo->clientIp = pConn->peerIp;
  pInfo->clientPort = pConn->peerPort;
J
jtao1735 已提交
517
  // pInfo->serverIp = pConn->destIp;
dengyihao's avatar
dengyihao 已提交
518

B
Bomin Zhang 已提交
519
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
520
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
521 522
}

S
Shengliang Guan 已提交
523
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
524
  SRpcReqContext *pContext;
dengyihao's avatar
dengyihao 已提交
525
  pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
526 527

  memset(pRsp, 0, sizeof(SRpcMsg));
dengyihao's avatar
dengyihao 已提交
528 529

  tsem_t sem;
530 531 532
  tsem_init(&sem, 0, 0);
  pContext->pSem = &sem;
  pContext->pRsp = pRsp;
533
  pContext->pSet = pEpSet;
534

535
  rpcSendRequest(shandle, pEpSet, pMsg, NULL);
536 537 538 539 540 541 542

  tsem_wait(&sem);
  tsem_destroy(&sem);

  return;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
543
// this API is used by server app to keep an APP context in case connection is broken
544
int rpcReportProgress(void *handle, char *pCont, int contLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
545
  SRpcConn *pConn = (SRpcConn *)handle;
dengyihao's avatar
dengyihao 已提交
546
  int       code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
547

548
  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
549

550 551
  if (pConn->user[0]) {
    // pReqMsg and reqMsgLen is re-used to store the context from app server
dengyihao's avatar
dengyihao 已提交
552
    pConn->pReqMsg = pCont;
553
    pConn->reqMsgLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
554
  } else {
555
    tDebug("%s, rpc connection is already released", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
556 557 558
    rpcFreeCont(pCont);
    code = -1;
  }
559

560
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
561
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
562 563
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
564 565 566
void rpcCancelRequest(int64_t rid) {
  SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid);
  if (pContext == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
567

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
568
  rpcCloseConn(pContext->pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
569

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
570
  taosReleaseRef(tsRpcRefId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
571 572
}

573
static void rpcFreeMsg(void *msg) {
dengyihao's avatar
dengyihao 已提交
574
  if (msg) {
575 576
    char *temp = (char *)msg - sizeof(SRpcReqContext);
    free(temp);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
577
    tTrace("free mem: %p", temp);
578 579 580
  }
}

J
jtao1735 已提交
581
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
582
  SRpcConn *pConn;
S
slguan 已提交
583

S
Shengliang 已提交
584 585 586 587 588 589 590 591 592 593 594
  uint32_t  peerIp = 0;
  uint32_t *pPeerIp = taosHashGet(tsFqdnHash, peerFqdn, strlen(peerFqdn) + 1);
  if (pPeerIp != NULL) {
    peerIp = *pPeerIp;
  } else {
    peerIp = taosGetIpv4FromFqdn(peerFqdn);
    if (peerIp != 0xFFFFFFFF) {
      taosHashPut(tsFqdnHash, peerFqdn, strlen(peerFqdn) + 1, &peerIp, sizeof(peerIp));
    }
  }

595
  if (peerIp == 0xFFFFFFFF) {
dengyihao's avatar
dengyihao 已提交
596 597
    tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn);
    terrno = TSDB_CODE_RPC_FQDN_ERROR;
J
jtao1735 已提交
598 599 600
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
601
  pConn = rpcAllocateClientConn(pRpc);
H
hzcheng 已提交
602

dengyihao's avatar
dengyihao 已提交
603
  if (pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
604
    tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn));
J
jtao1735 已提交
605
    pConn->peerIp = peerIp;
606
    pConn->peerPort = peerPort;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
607
    tstrncpy(pConn->user, pRpc->user, sizeof(pConn->user));
608
    pConn->connType = connType;
609

610
    if (taosOpenConn[connType]) {
dengyihao's avatar
dengyihao 已提交
611
      void *shandle = (connType & RPC_CONN_TCP) ? pRpc->tcphandle : pRpc->udphandle;
J
jtao1735 已提交
612
      pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
613
      if (pConn->chandle == NULL) {
H
Haojun Liao 已提交
614
        tError("failed to connect to:%s:%d", taosIpStr(pConn->peerIp), pConn->peerPort);
615

616
        terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
617 618 619
        rpcCloseConn(pConn);
        pConn = NULL;
      }
H
hzcheng 已提交
620 621 622
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
623
  return pConn;
H
hzcheng 已提交
624 625
}

626
static void rpcReleaseConn(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
627
  SRpcInfo *pRpc = pConn->pRpc;
628
  if (pConn->user[0] == 0) return;
H
hzcheng 已提交
629

630 631
  pConn->user[0] = 0;
  if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle);
H
hzcheng 已提交
632

633 634 635
  taosTmrStopA(&pConn->pTimer);
  taosTmrStopA(&pConn->pIdleTimer);

dengyihao's avatar
dengyihao 已提交
636 637 638 639
  if (pRpc->connType == TAOS_CONN_SERVER) {
    char   hashstr[40] = {0};
    size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId,
                           pConn->connType);
J
jtao1735 已提交
640
    taosHashRemove(pRpc->hash, hashstr, size);
dengyihao's avatar
dengyihao 已提交
641
    rpcFreeMsg(pConn->pRspMsg);  // it may have a response msg saved, but not request msg
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
642
    pConn->pRspMsg = NULL;
dengyihao's avatar
dengyihao 已提交
643

644 645
    // if server has ever reported progress, free content
    if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);  // do not use rpcFreeMsg
646
  } else {
647
    // if there is an outgoing message, free it
陶建辉(Jeff)'s avatar
TD-1669  
陶建辉(Jeff) 已提交
648
    if (pConn->outType && pConn->pReqMsg) {
649
      SRpcReqContext *pContext = pConn->pContext;
Y
yihaoDeng 已提交
650
      if (pContext) {
dengyihao's avatar
dengyihao 已提交
651 652
        if (pContext->pRsp) {
          // for synchronous API, post semaphore to unblock app
Y
yihaoDeng 已提交
653 654 655 656 657
          pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR;
          pContext->pRsp->pCont = NULL;
          pContext->pRsp->contLen = 0;
          tsem_post(pContext->pSem);
        }
dengyihao's avatar
dengyihao 已提交
658
        pContext->pConn = NULL;
Y
yihaoDeng 已提交
659 660
        taosRemoveRef(tsRpcRefId, pContext->rid);
      } else {
dengyihao's avatar
dengyihao 已提交
661
        assert(0);
662
      }
陶建辉(Jeff)'s avatar
TD-1669  
陶建辉(Jeff) 已提交
663
    }
664
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
665

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
666 667 668 669 670 671
  // memset could not be used, since lockeBy can not be reset
  pConn->inType = 0;
  pConn->outType = 0;
  pConn->inTranId = 0;
  pConn->outTranId = 0;
  pConn->secured = 0;
672
  pConn->peerId = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
673 674
  pConn->peerIp = 0;
  pConn->peerPort = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
675 676 677
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
  pConn->pContext = NULL;
678
  pConn->chandle = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
679 680

  taosFreeId(pRpc->idPool, pConn->sid);
681
  tDebug("%s, rpc connection is released", pConn->info);
682 683 684 685
}

static void rpcCloseConn(void *thandle) {
  SRpcConn *pConn = (SRpcConn *)thandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
686
  if (pConn == NULL) return;
687 688 689

  rpcLockConn(pConn);

dengyihao's avatar
dengyihao 已提交
690
  if (pConn->user[0]) rpcReleaseConn(pConn);
691

692
  rpcUnlockConn(pConn);
H
hzcheng 已提交
693 694
}

695 696
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
697

698 699 700
  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
701
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
702
  } else {
703 704 705 706
    pConn = pRpc->connList + sid;

    pConn->pRpc = pRpc;
    pConn->sid = sid;
707
    pConn->tranId = (uint16_t)(taosRand() & 0xFFFF);
708
    pConn->ownId = htonl(pConn->sid);
709
    pConn->linkUid = (uint32_t)((int64_t)pConn + taosGetPId() + (int64_t)pConn->tranId);
710 711
    pConn->spi = pRpc->spi;
    pConn->encrypt = pRpc->encrypt;
712
    if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_PASSWORD_LEN);
713
    tDebug("%s %p client connection is allocated, uid:0x%x", pRpc->label, pConn, pConn->linkUid);
714
  }
H
hzcheng 已提交
715

716 717
  return pConn;
}
H
hzcheng 已提交
718

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
719
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
720
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
721
  char      hashstr[40] = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
722 723
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;

dengyihao's avatar
dengyihao 已提交
724 725 726
  size_t size =
      snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);

727
  // check if it is already allocated
J
jtao1735 已提交
728
  SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
729
  if (ppConn) pConn = *ppConn;
730 731 732 733
  if (pConn) {
    pConn->secured = 0;
    return pConn;
  }
734

735 736 737 738 739 740
  // if code is not 0, it means it is simple reqhead, just ignore
  if (pHead->code != 0) {
    terrno = TSDB_CODE_RPC_ALREADY_PROCESSED;
    return NULL;
  }

741 742 743
  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
744
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
745 746
  } else {
    pConn = pRpc->connList + sid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
747
    memcpy(pConn->user, pHead->user, tListLen(pConn->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
748
    pConn->pRpc = pRpc;
H
hzcheng 已提交
749 750
    pConn->sid = sid;
    pConn->tranId = (uint16_t)(rand() & 0xFFFF);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
751
    pConn->ownId = htonl(pConn->sid);
752
    pConn->linkUid = pHead->linkUid;
753
    if (pRpc->afp) {
754
      if (pConn->user[0] == 0) {
755
        terrno = TSDB_CODE_RPC_AUTH_REQUIRED;
756
      } else {
S
Shengliang Guan 已提交
757
        terrno = (*pRpc->afp)(pRpc->parent, pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
758 759
      }

760
      if (terrno != 0) {
S
Shengliang Guan 已提交
761
        taosFreeId(pRpc->idPool, sid);  // sid shall be released
762 763
        pConn = NULL;
      }
H
hzcheng 已提交
764
    }
S
Shengliang Guan 已提交
765
  }
H
hzcheng 已提交
766

767
  if (pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
768 769
    if (pRecv->connType == RPC_CONN_UDPS && pRpc->numOfThreads > 1) {
      // UDP server, assign to new connection
S
Shengliang Guan 已提交
770
      pRpc->index = (pRpc->index + 1) % pRpc->numOfThreads;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
771 772
      pConn->localPort = (pRpc->localPort + pRpc->index);
    }
S
Shengliang Guan 已提交
773

J
jtao1735 已提交
774
    taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
775 776
    tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s spi:%d", pRpc->label, pConn, pConn->linkUid,
           sid, hashstr, pConn->spi);
777 778 779 780 781
  }

  return pConn;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
782
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
dengyihao's avatar
dengyihao 已提交
783
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
784
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
785 786 787

  if (sid) {
    pConn = pRpc->connList + sid;
788
    if (pConn->user[0] == 0) pConn = NULL;
dengyihao's avatar
dengyihao 已提交
789
  }
790

dengyihao's avatar
dengyihao 已提交
791
  if (pConn == NULL) {
792 793 794
    if (pRpc->connType == TAOS_CONN_SERVER) {
      pConn = rpcAllocateServerConn(pRpc, pRecv);
    } else {
795
      terrno = TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
796 797
    }
  }
798

799
  if (pConn) {
800
    if (pConn->linkUid != pHead->linkUid) {
801
      terrno = TSDB_CODE_RPC_MISMATCHED_LINK_ID;
S
Shengliang Guan 已提交
802 803
      tDebug("%s %p %p, linkUid:0x%x is not matched with received:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
             pConn->linkUid, pHead->linkUid);
804
      pConn = NULL;
H
hzcheng 已提交
805 806 807
    }
  }

808
  return pConn;
H
hzcheng 已提交
809 810
}

811
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
S
Shengliang Guan 已提交
812 813
  SRpcConn *pConn;
  SRpcInfo *pRpc = pContext->pRpc;
dengyihao's avatar
dengyihao 已提交
814
  SEpSet *  pEpSet = &pContext->epSet;
H
hzcheng 已提交
815

dengyihao's avatar
dengyihao 已提交
816 817 818
  pConn =
      rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
  if (pConn == NULL || pConn->user[0] == 0) {
819
    pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
dengyihao's avatar
dengyihao 已提交
820
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
821 822

  if (pConn) {
S
Shengliang Guan 已提交
823
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
824
    pConn->ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
825
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
826
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
827
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
828
    tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
829
  }
H
hzcheng 已提交
830

831
  return pConn;
H
hzcheng 已提交
832 833
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
834
static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
dengyihao's avatar
dengyihao 已提交
835 836 837 838 839 840
  if (pConn->peerId == 0) {
    pConn->peerId = pHead->sourceId;
  } else {
    if (pConn->peerId != pHead->sourceId) {
      tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, pConn->peerId, pHead->sourceId);
      return TSDB_CODE_RPC_INVALID_VALUE;
H
hzcheng 已提交
841
    }
dengyihao's avatar
dengyihao 已提交
842
  }
H
hzcheng 已提交
843

dengyihao's avatar
dengyihao 已提交
844 845 846 847 848
  if (pConn->inTranId == pHead->tranId) {
    if (pConn->inType == pHead->msgType) {
      if (pHead->code == 0) {
        tDebug("%s, %s is retransmitted", pConn->info, TMSG_INFO(pHead->msgType));
        rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
849
      } else {
dengyihao's avatar
dengyihao 已提交
850
        // do nothing, it is heart beat from client
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
851
      }
dengyihao's avatar
dengyihao 已提交
852 853 854 855 856
    } else if (pConn->inType == 0) {
      tDebug("%s, %s is already processed, tranId:%d", pConn->info, TMSG_INFO(pHead->msgType), pConn->inTranId);
      rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen);  // resend the response
    } else {
      tDebug("%s, mismatched message %s and tranId", pConn->info, TMSG_INFO(pHead->msgType));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
857
    }
H
hzcheng 已提交
858

dengyihao's avatar
dengyihao 已提交
859 860 861
    // do not reply any message
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
  }
H
hzcheng 已提交
862

dengyihao's avatar
dengyihao 已提交
863 864 865 866
  if (pConn->inType != 0) {
    tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, pConn->inTranId, pHead->tranId);
    return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
  }
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
867

dengyihao's avatar
dengyihao 已提交
868 869 870 871
  if (rpcContLenFromMsg(pHead->msgLen) <= 0) {
    tDebug("%s, message body is empty, ignore", pConn->info);
    return TSDB_CODE_RPC_APP_ERROR;
  }
H
hzcheng 已提交
872

dengyihao's avatar
dengyihao 已提交
873 874 875 876 877 878 879 880
  pConn->inTranId = pHead->tranId;
  pConn->inType = pHead->msgType;

  // start the progress timer to monitor the response from server app
  if (pConn->connType != RPC_CONN_TCPS)
    pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl);

  return 0;
H
hzcheng 已提交
881 882
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
883
static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
884
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
885
  pConn->peerId = pHead->sourceId;
H
hzcheng 已提交
886

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
887
  if (pConn->outType == 0 || pConn->pContext == NULL) {
888
    return TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
889
  }
H
hzcheng 已提交
890

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
891
  if (pHead->tranId != pConn->outTranId) {
892
    return TSDB_CODE_RPC_INVALID_TRAN_ID;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
893
  }
H
hzcheng 已提交
894

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
895
  if (pHead->msgType != pConn->outType + 1) {
896
    return TSDB_CODE_RPC_INVALID_RESPONSE_TYPE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
897
  }
H
hzcheng 已提交
898

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
899 900 901
  taosTmrStopA(&pConn->pTimer);
  pConn->retry = 0;

902
  if (pHead->code == TSDB_CODE_RPC_AUTH_REQUIRED && pRpc->spi) {
903
    tDebug("%s, authentication shall be restarted", pConn->info);
904
    pConn->secured = 0;
dengyihao's avatar
dengyihao 已提交
905
    rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
906 907
    if (pConn->connType != RPC_CONN_TCPC)
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
908
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
909 910
  }

911 912 913 914
  if (pHead->code == TSDB_CODE_RPC_MISMATCHED_LINK_ID) {
    tDebug("%s, mismatched linkUid, link shall be restarted", pConn->info);
    pConn->secured = 0;
    ((SRpcHead *)pConn->pReqMsg)->destId = 0;
dengyihao's avatar
dengyihao 已提交
915
    rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
916 917 918 919 920
    if (pConn->connType != RPC_CONN_TCPC)
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
  }

921
  if (pHead->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
922
    if (pConn->tretry <= tsRpcMaxRetry) {
923
      tDebug("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
924 925
      pConn->tretry++;
      rpcSendReqHead(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
926 927
      if (pConn->connType != RPC_CONN_TCPC)
        pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
928
      return TSDB_CODE_RPC_ALREADY_PROCESSED;
H
hzcheng 已提交
929
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
930
      // peer still in processing, give up
931
      tDebug("%s, server processing takes too long time, give up", pConn->info);
932
      pHead->code = TSDB_CODE_RPC_TOO_SLOW;
H
hzcheng 已提交
933 934
    }
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
935

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
936 937
  pConn->outType = 0;
  pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
938
  pConn->reqMsgLen = 0;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
939 940
  SRpcReqContext *pContext = pConn->pContext;

dengyihao's avatar
dengyihao 已提交
941
  if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
S
Shengliang Guan 已提交
942
    if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SEpSet)) {
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
943
      // if EpSet is not included in the msg, treat it as NOT_READY
dengyihao's avatar
dengyihao 已提交
944
      pHead->code = TSDB_CODE_RPC_NOT_READY;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
945 946 947
    } else {
      pContext->redirect++;
      if (pContext->redirect > TSDB_MAX_REPLICA) {
dengyihao's avatar
dengyihao 已提交
948
        pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
949 950 951
        tWarn("%s, too many redirects, quit", pConn->info);
      }
    }
dengyihao's avatar
dengyihao 已提交
952
  }
953 954

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
955 956
}

S
slguan 已提交
957
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) {
dengyihao's avatar
dengyihao 已提交
958 959
  int32_t   sid;
  SRpcConn *pConn = NULL;
H
hzcheng 已提交
960

961
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
962

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
963
  sid = htonl(pHead->destId);
S
slguan 已提交
964
  *ppContext = NULL;
H
hzcheng 已提交
965

966
  if (TMSG_INDEX(pHead->msgType) >= TDMT_MAX || TMSG_INDEX(pHead->msgType) <= 0) {
967
    tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
dengyihao's avatar
dengyihao 已提交
968 969
    terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE;
    return NULL;
H
hzcheng 已提交
970 971
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
972
  if (sid < 0 || sid >= pRpc->sessions) {
dengyihao's avatar
dengyihao 已提交
973 974 975 976
    tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, pRpc->sessions,
           TMSG_INFO(pHead->msgType));
    terrno = TSDB_CODE_RPC_INVALID_SESSION_ID;
    return NULL;
H
hzcheng 已提交
977 978
  }

979
  if (rpcIsReq(pHead->msgType) && htonl(pHead->msgVer) != tsVersion >> 8) {
dengyihao's avatar
dengyihao 已提交
980 981 982 983
    tDebug("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion,
           TMSG_INFO(pHead->msgType));
    terrno = TSDB_CODE_RPC_INVALID_VERSION;
    return NULL;
984 985
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
986
  pConn = rpcGetConnObj(pRpc, sid, pRecv);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
987
  if (pConn == NULL) {
dengyihao's avatar
dengyihao 已提交
988
    tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
989
    return NULL;
dengyihao's avatar
dengyihao 已提交
990
  }
H
hzcheng 已提交
991

992
  rpcLockConn(pConn);
H
hzcheng 已提交
993

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
994 995
  if (rpcIsReq(pHead->msgType)) {
    pConn->ahandle = (void *)pHead->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
996
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
997 998 999
  }

  sid = pConn->sid;
1000
  if (pConn->chandle == NULL) pConn->chandle = pRecv->chandle;
dengyihao's avatar
dengyihao 已提交
1001
  pConn->peerIp = pRecv->ip;
1002
  pConn->peerPort = pRecv->port;
dengyihao's avatar
dengyihao 已提交
1003
  if (pHead->port) pConn->peerPort = htons(pHead->port);
H
hzcheng 已提交
1004

1005
  terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1006 1007 1008 1009

  // code can be transformed only after authentication
  pHead->code = htonl(pHead->code);

1010
  if (terrno == 0) {
J
jtao1735 已提交
1011
    if (pHead->encrypt) {
1012 1013
      // decrypt here
    }
H
hzcheng 已提交
1014

dengyihao's avatar
dengyihao 已提交
1015
    if (rpcIsReq(pHead->msgType)) {
1016
      pConn->connType = pRecv->connType;
Y
yihaoDeng 已提交
1017
      terrno = rpcProcessReqHead(pConn, pHead);
1018

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1019
      // stop idle timer
dengyihao's avatar
dengyihao 已提交
1020
      taosTmrStopA(&pConn->pIdleTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1021

dengyihao's avatar
dengyihao 已提交
1022
      // client shall send the request within tsRpcTime again for UDP, double it
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1023
      if (pConn->connType != RPC_CONN_TCPS)
dengyihao's avatar
dengyihao 已提交
1024
        pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer * 2, pConn, pRpc->tmrCtrl);
1025 1026
    } else {
      terrno = rpcProcessRspHead(pConn, pHead);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1027
      *ppContext = pConn->pContext;
1028
    }
H
hzcheng 已提交
1029 1030
  }

1031
  rpcUnlockConn(pConn);
H
hzcheng 已提交
1032

1033
  return pConn;
H
hzcheng 已提交
1034 1035
}

Y
TD-3409  
yihaoDeng 已提交
1036
static void doRpcReportBrokenLinkToServer(void *param, void *id) {
dengyihao's avatar
dengyihao 已提交
1037 1038 1039
  SRpcMsg * pRpcMsg = (SRpcMsg *)(param);
  SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle);
  SRpcInfo *pRpc = pConn->pRpc;
S
Shengliang Guan 已提交
1040 1041
  (*(pRpc->cfp))(pRpc->parent, pRpcMsg, NULL);
  free(pRpcMsg);
Y
TD-3409  
yihaoDeng 已提交
1042
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1043 1044
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1045
  if (pConn->pReqMsg == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1046 1047

  // if there are pending request, notify the app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1048
  rpcAddRef(pRpc);
1049
  tDebug("%s, notify the server app, connection is gone", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1050

Y
TD-3409  
yihaoDeng 已提交
1051
  SRpcMsg *rpcMsg = malloc(sizeof(SRpcMsg));
dengyihao's avatar
dengyihao 已提交
1052 1053
  rpcMsg->pCont = pConn->pReqMsg;      // pReqMsg is re-used to store the APP context from server
  rpcMsg->contLen = pConn->reqMsgLen;  // reqMsgLen is re-used to store the APP context length
Y
TD-3409  
yihaoDeng 已提交
1054 1055 1056
  rpcMsg->ahandle = pConn->ahandle;
  rpcMsg->handle = pConn;
  rpcMsg->msgType = pConn->inType;
dengyihao's avatar
dengyihao 已提交
1057
  rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1058 1059
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
Y
TD-3409  
yihaoDeng 已提交
1060 1061 1062 1063 1064
  if (pRpc->cfp) {
    taosTmrStart(doRpcReportBrokenLinkToServer, 0, rpcMsg, pRpc->tmrCtrl);
  } else {
    free(rpcMsg);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1065 1066
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1067
static void rpcProcessBrokenLink(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1068
  if (pConn == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1069
  SRpcInfo *pRpc = pConn->pRpc;
1070
  tDebug("%s, link is broken", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1071 1072

  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1073

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1074 1075
  if (pConn->outType) {
    SRpcReqContext *pContext = pConn->pContext;
1076
    pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1077
    pContext->pConn = NULL;
1078
    pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1079 1080
    taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
  }
dengyihao's avatar
dengyihao 已提交
1081 1082

  if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1083

1084
  rpcReleaseConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1085
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1086 1087
}

1088
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
dengyihao's avatar
dengyihao 已提交
1089 1090 1091
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
  SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle;
  SRpcConn *pConn = (SRpcConn *)pRecv->thandle;
H
hzcheng 已提交
1092

S
Shengliang Guan 已提交
1093
  tDump(pRecv->msg, pRecv->msgLen);
1094 1095

  // underlying UDP layer does not know it is server or client
dengyihao's avatar
dengyihao 已提交
1096
  pRecv->connType = pRecv->connType | pRpc->connType;
H
hzcheng 已提交
1097

B
Bomin Zhang 已提交
1098
  if (pRecv->msg == NULL) {
1099
    rpcProcessBrokenLink(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1100 1101 1102
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1103
  terrno = 0;
S
slguan 已提交
1104 1105
  SRpcReqContext *pContext;
  pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
H
hzcheng 已提交
1106

1107 1108 1109
  char ipstr[24] = {0};
  taosIpPort2String(pRecv->ip, pRecv->port, ipstr);

1110
  if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) {
dengyihao's avatar
dengyihao 已提交
1111 1112 1113
    tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn,
           (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen, pHead->sourceId,
           pHead->destId, pHead->tranId, pHead->code);
1114
  } else {
dengyihao's avatar
dengyihao 已提交
1115 1116 1117
    tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn,
           (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId,
           pHead->tranId, pHead->code);
1118
  }
H
hzcheng 已提交
1119

H
TD-34  
hzcheng 已提交
1120
  int32_t code = terrno;
1121
  if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) {
dengyihao's avatar
dengyihao 已提交
1122
    if (code != 0) {  // parsing error
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1123
      if (rpcIsReq(pHead->msgType)) {
H
TD-34  
hzcheng 已提交
1124
        rpcSendErrorMsgToPeer(pRecv, code);
B
Bomin Zhang 已提交
1125 1126 1127
        if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE) {
          rpcCloseConn(pConn);
        }
1128
        if (TMSG_INDEX(pHead->msgType) + 1 > 1 && TMSG_INDEX(pHead->msgType) + 1 < TDMT_MAX) {
dengyihao's avatar
dengyihao 已提交
1129 1130
          tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
                 TMSG_INFO(pHead->msgType + 1), code);
Y
yihaoDeng 已提交
1131
        } else {
dengyihao's avatar
dengyihao 已提交
1132 1133
          tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
                 TMSG_INFO(pHead->msgType), code);
1134
        }
dengyihao's avatar
dengyihao 已提交
1135 1136
      }
    } else {  // msg is passed to app only parsing is ok
S
slguan 已提交
1137
      rpcProcessIncomingMsg(pConn, pHead, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1138
    }
H
hzcheng 已提交
1139 1140
  }

dengyihao's avatar
dengyihao 已提交
1141
  if (code) rpcFreeMsg(pRecv->msg);  // parsing failed, msg shall be freed
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1142 1143
  return pConn;
}
H
hzcheng 已提交
1144

1145
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
dengyihao's avatar
dengyihao 已提交
1146
  SRpcInfo *pRpc = pContext->pRpc;
1147

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1148
  pContext->pConn = NULL;
dengyihao's avatar
dengyihao 已提交
1149
  if (pContext->pRsp) {
1150
    // for synchronous API
S
Shengliang Guan 已提交
1151
    memcpy(pContext->pSet, &pContext->epSet, sizeof(SEpSet));
1152
    memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1153
    tsem_post(pContext->pSem);
1154
  } else {
dengyihao's avatar
dengyihao 已提交
1155
    // for asynchronous API
S
Shengliang Guan 已提交
1156
    SEpSet *pEpSet = NULL;
dengyihao's avatar
dengyihao 已提交
1157
    if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) pEpSet = &pContext->epSet;
1158

S
Shengliang Guan 已提交
1159
    (*pRpc->cfp)(pRpc->parent, pMsg, pEpSet);
1160 1161 1162
  }

  // free the request message
dengyihao's avatar
dengyihao 已提交
1163
  taosRemoveRef(tsRpcRefId, pContext->rid);
1164 1165
}

S
slguan 已提交
1166
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1167
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1168
  SRpcMsg   rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1169

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1170
  pHead = rpcDecompressRpcMsg(pHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1171 1172 1173
  rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
  rpcMsg.pCont = pHead->content;
  rpcMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
1174 1175 1176
  rpcMsg.code = pHead->code;

  if (rpcIsReq(pHead->msgType)) {
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1177
    rpcMsg.ahandle = pConn->ahandle;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1178 1179
    rpcMsg.handle = pConn;
    rpcAddRef(pRpc);  // add the refCount for requests
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1180

陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1181
    // notify the server app
S
Shengliang Guan 已提交
1182
    (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
H
hzcheng 已提交
1183
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1184
    // it's a response
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1185
    rpcMsg.handle = pContext;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1186
    rpcMsg.ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1187
    pContext->pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1188

1189
    // for UDP, port may be changed by server, the port in epSet shall be used for cache
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1190
    if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
dengyihao's avatar
dengyihao 已提交
1191 1192
      rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse],
                          pConn->connType);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1193
    } else {
1194 1195
      rpcCloseConn(pConn);
    }
H
hzcheng 已提交
1196

1197
    if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1198
      pContext->numOfTry = 0;
dengyihao's avatar
dengyihao 已提交
1199
      SEpSet *pEpSet = (SEpSet *)pHead->content;
1200 1201
      if (pEpSet->numOfEps > 0) {
        memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
S
TD-1670  
Shengliang Guan 已提交
1202 1203 1204
        tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,
               pContext->epSet.inUse);
        for (int i = 0; i < pContext->epSet.numOfEps; ++i) {
1205
          pContext->epSet.port[i] = htons(pContext->epSet.port[i]);
S
TD-1670  
Shengliang Guan 已提交
1206 1207 1208
          tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.fqdn[i],
                 pContext->epSet.port[i]);
        }
1209
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1210
      rpcSendReqToServer(pRpc, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1211
      rpcFreeCont(rpcMsg.pCont);
dengyihao's avatar
dengyihao 已提交
1212 1213
    } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY ||
               pHead->code == TSDB_CODE_DND_OFFLINE) {
1214 1215
      pContext->code = pHead->code;
      rpcProcessConnError(pContext, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1216
      rpcFreeCont(rpcMsg.pCont);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1217
    } else {
1218
      rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1219 1220 1221 1222
    }
  }
}

1223
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
dengyihao's avatar
dengyihao 已提交
1224 1225
  char      msg[RPC_MSG_OVERHEAD];
  SRpcHead *pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1226

1227
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1228 1229 1230
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
dengyihao's avatar
dengyihao 已提交
1231
  pHead->msgType = pConn->inType + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1232
  pHead->spi = pConn->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1233 1234 1235 1236
  pHead->encrypt = 0;
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
1237
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1238
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1239
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1240
  pHead->code = htonl(code);
H
hzcheng 已提交
1241

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1242
  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
dengyihao's avatar
dengyihao 已提交
1243
  pConn->secured = 1;  // connection shall be secured
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1244 1245 1246
}

static void rpcSendReqHead(SRpcConn *pConn) {
dengyihao's avatar
dengyihao 已提交
1247 1248
  char      msg[RPC_MSG_OVERHEAD];
  SRpcHead *pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1249 1250 1251 1252 1253 1254

  // set msg header
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
  pHead->msgType = pConn->outType;
1255
  pHead->msgVer = htonl(tsVersion >> 8);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1256 1257 1258 1259 1260 1261
  pHead->spi = pConn->spi;
  pHead->encrypt = 0;
  pHead->tranId = pConn->outTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1262
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1263 1264 1265 1266
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
  pHead->code = 1;

  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1267
}
H
hjxilinx 已提交
1268

1269
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
dengyihao's avatar
dengyihao 已提交
1270 1271 1272 1273
  SRpcHead *pRecvHead, *pReplyHead;
  char      msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t)];
  uint32_t  timeStamp;
  int       msgLen;
H
hzcheng 已提交
1274

1275
  pRecvHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1276
  pReplyHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1277

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1278 1279
  memset(msg, 0, sizeof(SRpcHead));
  pReplyHead->version = pRecvHead->version;
1280
  pReplyHead->msgType = (tmsg_t)(pRecvHead->msgType + 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1281
  pReplyHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1282
  pReplyHead->encrypt = pRecvHead->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1283
  pReplyHead->tranId = pRecvHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1284
  pReplyHead->sourceId = pRecvHead->destId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1285
  pReplyHead->destId = pRecvHead->sourceId;
1286
  pReplyHead->linkUid = pRecvHead->linkUid;
1287
  pReplyHead->ahandle = pRecvHead->ahandle;
H
hzcheng 已提交
1288

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1289 1290
  pReplyHead->code = htonl(code);
  msgLen = sizeof(SRpcHead);
H
hzcheng 已提交
1291

1292
  if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP) {
1293
    // include a time stamp if client's time is not synchronized well
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1294
    uint8_t *pContent = pReplyHead->content;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1295
    timeStamp = htonl(taosGetTimestampSec());
1296 1297
    memcpy(pContent, &timeStamp, sizeof(timeStamp));
    msgLen += sizeof(timeStamp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1298
  }
H
hzcheng 已提交
1299

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1300
  pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1301
  (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle);
H
hzcheng 已提交
1302

dengyihao's avatar
dengyihao 已提交
1303
  return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1304 1305
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1306
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
dengyihao's avatar
dengyihao 已提交
1307 1308 1309 1310
  SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
  char *    msg = (char *)pHead;
  int       msgLen = rpcMsgLenFromCont(pContext->contLen);
  tmsg_t    msgType = pContext->msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1311

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1312
  pContext->numOfTry++;
1313
  SRpcConn *pConn = rpcSetupConnToServer(pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1314 1315
  if (pConn == NULL) {
    pContext->code = terrno;
1316
    taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1317 1318 1319
    return;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1320
  pContext->pConn = pConn;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1321
  pConn->ahandle = pContext->ahandle;
1322
  rpcLockConn(pConn);
1323

dengyihao's avatar
dengyihao 已提交
1324
  // set the message header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1325
  pHead->version = 1;
1326
  pHead->msgVer = htonl(tsVersion >> 8);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1327 1328
  pHead->msgType = msgType;
  pHead->encrypt = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1329
  pConn->tranId++;
dengyihao's avatar
dengyihao 已提交
1330
  if (pConn->tranId == 0) pConn->tranId++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1331 1332 1333 1334
  pHead->tranId = pConn->tranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->port = 0;
1335
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1336
  pHead->ahandle = (uint64_t)pConn->ahandle;
1337
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1338 1339 1340

  // set the connection parameters
  pConn->outType = msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1341
  pConn->outTranId = pHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1342 1343
  pConn->pReqMsg = msg;
  pConn->reqMsgLen = msgLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1344
  pConn->pContext = pContext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1345

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1346
  rpcSendMsgToPeer(pConn, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1347 1348
  if (pConn->connType != RPC_CONN_TCPC)
    taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1349 1350

  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1351
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1352 1353

static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
dengyihao's avatar
dengyihao 已提交
1354 1355
  int       writtenLen = 0;
  SRpcHead *pHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1356

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1357
  msgLen = rpcAddAuthPart(pConn, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1358

dengyihao's avatar
dengyihao 已提交
1359 1360 1361
  if (rpcIsReq(pHead->msgType)) {
    tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType),
           pConn->peerFqdn, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1362
  } else {
H
Haojun Liao 已提交
1363 1364 1365 1366 1367 1368 1369 1370
    if (pHead->code == 0) {
      pConn->secured = 1;  // for success response, set link as secured
    }

    char ipport[40] = {0};
    taosIpPort2String(pConn->peerIp, pConn->peerPort, ipport);
    tDebug("%s, %s is sent to %s, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType),
           ipport, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1371
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1372

dengyihao's avatar
dengyihao 已提交
1373
  // tTrace("connection type is: %d", pConn->connType);
1374
  writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1375

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1376
  if (writtenLen != msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1377
    tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
1378
  }
dengyihao's avatar
dengyihao 已提交
1379

S
Shengliang Guan 已提交
1380
  tDump(msg, msgLen);
H
hzcheng 已提交
1381 1382
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1383
static void rpcProcessConnError(void *param, void *id) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1384
  SRpcReqContext *pContext = (SRpcReqContext *)param;
dengyihao's avatar
dengyihao 已提交
1385
  SRpcInfo *      pRpc = pContext->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1386
  SRpcMsg         rpcMsg;
dengyihao's avatar
dengyihao 已提交
1387

H
hjxilinx 已提交
1388 1389 1390
  if (pRpc == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1391

1392
  tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);
H
hzcheng 已提交
1393

H
Hongze Cheng 已提交
1394
  if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TDMT_VND_FETCH) {
dengyihao's avatar
dengyihao 已提交
1395
    rpcMsg.msgType = pContext->msgType + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1396
    rpcMsg.ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1397 1398 1399
    rpcMsg.code = pContext->code;
    rpcMsg.pCont = NULL;
    rpcMsg.contLen = 0;
1400 1401

    rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1402
  } else {
dengyihao's avatar
dengyihao 已提交
1403
    // move to next IP
1404 1405
    pContext->epSet.inUse++;
    pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1406
    rpcSendReqToServer(pRpc, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1407
  }
H
hzcheng 已提交
1408 1409
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1410 1411 1412
static void rpcProcessRetryTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;
H
hzcheng 已提交
1413

1414
  rpcLockConn(pConn);
H
hzcheng 已提交
1415

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1416
  if (pConn->outType && pConn->user[0]) {
H
Hongze Cheng 已提交
1417
    tDebug("%s, expected %s is not received", pConn->info, TMSG_INFO((int)pConn->outType + 1));
H
hzcheng 已提交
1418 1419 1420
    pConn->pTimer = NULL;
    pConn->retry++;

S
slguan 已提交
1421
    if (pConn->retry < 4) {
H
Hongze Cheng 已提交
1422
      tDebug("%s, re-send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn, pConn->peerPort);
dengyihao's avatar
dengyihao 已提交
1423
      rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
1424
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
H
hzcheng 已提交
1425 1426
    } else {
      // close the connection
dengyihao's avatar
dengyihao 已提交
1427 1428
      tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn,
             pConn->peerPort);
1429 1430
      if (pConn->pContext) {
        pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1431
        pConn->pContext->pConn = NULL;
1432
        pConn->pReqMsg = NULL;
1433
        taosTmrStart(rpcProcessConnError, 1, pConn->pContext, pRpc->tmrCtrl);
1434 1435
        rpcReleaseConn(pConn);
      }
H
hzcheng 已提交
1436
    }
1437
  } else {
1438
    tDebug("%s, retry timer not processed", pConn->info);
H
hzcheng 已提交
1439 1440
  }

1441
  rpcUnlockConn(pConn);
H
hzcheng 已提交
1442 1443
}

1444 1445 1446
static void rpcProcessIdleTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;

1447 1448
  rpcLockConn(pConn);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1449
  if (pConn->user[0]) {
1450
    tDebug("%s, close the connection since no activity", pConn->info);
dengyihao's avatar
dengyihao 已提交
1451
    if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
1452
    rpcReleaseConn(pConn);
1453
  } else {
1454
    tDebug("%s, idle timer:%p not processed", pConn->info, tmrId);
1455
  }
1456 1457

  rpcUnlockConn(pConn);
1458 1459 1460 1461 1462 1463
}

static void rpcProcessProgressTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;

1464
  rpcLockConn(pConn);
1465

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1466
  if (pConn->inType && pConn->user[0]) {
1467
    tDebug("%s, progress timer expired, send progress", pConn->info);
1468
    rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
1469
    pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
1470
  } else {
1471
    tDebug("%s, progress timer:%p not processed", pConn->info, tmrId);
1472 1473
  }

1474
  rpcUnlockConn(pConn);
1475 1476
}

dengyihao's avatar
dengyihao 已提交
1477 1478 1479 1480 1481
static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen) {
  SRpcHead *pHead = rpcHeadFromCont(pCont);
  int32_t   finalLen = 0;
  int       overhead = sizeof(SRpcComp);

1482 1483 1484
  if (!NEEDTO_COMPRESSS_MSG(contLen)) {
    return contLen;
  }
dengyihao's avatar
dengyihao 已提交
1485 1486

  char *buf = malloc(contLen + overhead + 8);  // 8 extra bytes
1487
  if (buf == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1488
    tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
1489 1490
    return contLen;
  }
dengyihao's avatar
dengyihao 已提交
1491

1492
  int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
S
Shuduo Sang 已提交
1493
  tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead);
dengyihao's avatar
dengyihao 已提交
1494

1495 1496 1497 1498
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
S
TD-4100  
Shengliang Guan 已提交
1499
  if (compLen > 0 && compLen < contLen - overhead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1500
    SRpcComp *pComp = (SRpcComp *)pCont;
dengyihao's avatar
dengyihao 已提交
1501 1502
    pComp->reserved = 0;
    pComp->contLen = htonl(contLen);
1503
    memcpy(pCont + overhead, buf, compLen);
dengyihao's avatar
dengyihao 已提交
1504

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1505
    pHead->comp = 1;
S
Shuduo Sang 已提交
1506
    tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen);
1507 1508 1509 1510 1511 1512 1513 1514 1515
    finalLen = compLen + overhead;
  } else {
    finalLen = contLen;
  }

  free(buf);
  return finalLen;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1516
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
dengyihao's avatar
dengyihao 已提交
1517 1518 1519 1520
  int       overhead = sizeof(SRpcComp);
  SRpcHead *pNewHead = NULL;
  uint8_t * pCont = pHead->content;
  SRpcComp *pComp = (SRpcComp *)pHead->content;
1521

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1522
  if (pHead->comp) {
1523
    // decompress the content
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1524 1525
    assert(pComp->reserved == 0);
    int contLen = htonl(pComp->contLen);
dengyihao's avatar
dengyihao 已提交
1526

1527
    // prepare the temporary buffer to decompress message
1528
    char *temp = (char *)malloc(contLen + RPC_MSG_OVERHEAD);
dengyihao's avatar
dengyihao 已提交
1529 1530
    pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext));  // reserve SRpcReqContext

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1531
    if (pNewHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1532
      int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
dengyihao's avatar
dengyihao 已提交
1533
      int origLen = LZ4_decompress_safe((char *)(pCont + overhead), (char *)pNewHead->content, compLen, contLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1534
      assert(origLen == contLen);
dengyihao's avatar
dengyihao 已提交
1535

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1536
      memcpy(pNewHead, pHead, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1537
      pNewHead->msgLen = rpcMsgLenFromCont(origLen);
dengyihao's avatar
dengyihao 已提交
1538 1539
      rpcFreeMsg(pHead);  // free the compressed message buffer
      pHead = pNewHead;
S
TD-1762  
Shengliang Guan 已提交
1540
      tTrace("decomp malloc mem:%p", temp);
1541
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1542
      tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
1543 1544 1545
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1546
  return pHead;
1547 1548
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1549
static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) {
dengyihao's avatar
dengyihao 已提交
1550
  T_MD5_CTX context;
dengyihao's avatar
dengyihao 已提交
1551
  int       ret = -1;
H
hzcheng 已提交
1552

dengyihao's avatar
dengyihao 已提交
1553
  tMD5Init(&context);
H
Haojun Liao 已提交
1554
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1555
  tMD5Update(&context, (uint8_t *)pMsg, msgLen);
H
Haojun Liao 已提交
1556
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1557
  tMD5Final(&context);
H
hzcheng 已提交
1558 1559 1560 1561 1562 1563

  if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;

  return ret;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1564
static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) {
dengyihao's avatar
dengyihao 已提交
1565
  T_MD5_CTX context;
H
hzcheng 已提交
1566

dengyihao's avatar
dengyihao 已提交
1567
  tMD5Init(&context);
H
Haojun Liao 已提交
1568
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1569
  tMD5Update(&context, (uint8_t *)pMsg, msgLen);
H
Haojun Liao 已提交
1570
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1571
  tMD5Final(&context);
H
hzcheng 已提交
1572 1573 1574

  memcpy(pAuth, context.digest, sizeof(context.digest));
}
1575 1576

static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1577
  SRpcHead *pHead = (SRpcHead *)msg;
1578

1579
  if (pConn->spi && pConn->secured == 0) {
1580
    // add auth part
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1581
    pHead->spi = pConn->spi;
1582 1583 1584
    SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1585
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1586
    rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
1587
  } else {
1588
    pHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1589
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1590 1591 1592 1593 1594 1595
  }

  return msgLen;
}

static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1596
  SRpcHead *pHead = (SRpcHead *)msg;
1597
  int       code = 0;
1598

dengyihao's avatar
dengyihao 已提交
1599 1600
  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
    // secured link, or no authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1601
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1602
    // tTrace("%s, secured link, no auth is required", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1603 1604 1605
    return 0;
  }

dengyihao's avatar
dengyihao 已提交
1606
  if (!rpcIsReq(pHead->msgType)) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1607 1608
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
1609
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
dengyihao's avatar
dengyihao 已提交
1610 1611
        code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
        code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
1612
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1613
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1614
      return 0;
1615
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1616
  }
dengyihao's avatar
dengyihao 已提交
1617

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1618
  code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1619
  if (pHead->spi == pConn->spi) {
1620
    // authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1621
    SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest));
1622 1623 1624 1625 1626

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1627
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
1628
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
1629
    } else {
dengyihao's avatar
dengyihao 已提交
1630
      if (rpcAuthenticateMsg(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
1631
        tDebug("%s, authentication failed, msg discarded", pConn->info);
1632
        code = TSDB_CODE_RPC_AUTH_FAILURE;
1633
      } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1634
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
dengyihao's avatar
dengyihao 已提交
1635
        if (!rpcIsReq(pHead->msgType)) pConn->secured = 1;  // link is secured for client
1636
        // tTrace("%s, message is authenticated", pConn->info);
1637 1638 1639
      }
    }
  } else {
S
Shengliang 已提交
1640
    tError("%s, auth spi:%d not matched with received:%d %p", pConn->info, pConn->spi, pHead->spi, pConn);
1641
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
1642 1643 1644 1645 1646
  }

  return code;
}

1647
static void rpcLockConn(SRpcConn *pConn) {
S
TD-2616  
Shengliang Guan 已提交
1648
  int64_t tid = taosGetSelfPthreadId();
1649
  int     i = 0;
1650 1651 1652 1653 1654 1655 1656 1657
  while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) {
    if (++i % 1000 == 0) {
      sched_yield();
    }
  }
}

static void rpcUnlockConn(SRpcConn *pConn) {
S
TD-2616  
Shengliang Guan 已提交
1658
  int64_t tid = taosGetSelfPthreadId();
1659 1660 1661 1662
  if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) {
    assert(false);
  }
}
1663

dengyihao's avatar
dengyihao 已提交
1664
static void rpcAddRef(SRpcInfo *pRpc) { atomic_add_fetch_32(&pRpc->refCount, 1); }
1665

dengyihao's avatar
dengyihao 已提交
1666
static void rpcDecRef(SRpcInfo *pRpc) {
1667
  if (atomic_sub_fetch_32(&pRpc->refCount, 1) == 0) {
1668
    rpcCloseConnCache(pRpc->pCache);
dengyihao's avatar
dengyihao 已提交
1669
    taosHashCleanup(pRpc->hash);
dengyihao's avatar
dengyihao 已提交
1670
    taosTmrCleanUp(pRpc->tmrCtrl);
dengyihao's avatar
dengyihao 已提交
1671
    taosIdPoolCleanUp(pRpc->idPool);
1672

S
TD-1848  
Shengliang Guan 已提交
1673
    tfree(pRpc->connList);
1674
    pthread_mutex_destroy(&pRpc->mutex);
1675
    tDebug("%s rpc resources are released", pRpc->label);
S
TD-1848  
Shengliang Guan 已提交
1676
    tfree(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1677

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1678
    atomic_sub_fetch_32(&tsRpcNum, 1);
1679 1680
  }
}
dengyihao's avatar
dengyihao 已提交
1681
#endif