rpcMain.c 54.3 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "lz4.h"
S
slguan 已提交
17
#include "os.h"
dengyihao's avatar
dengyihao 已提交
18 19 20 21 22 23 24 25
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
H
hzcheng 已提交
26 27 28
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
H
Hongze Cheng 已提交
29
#include "tmsg.h"
dengyihao's avatar
dengyihao 已提交
30
#include "transportInt.h"
dengyihao's avatar
dengyihao 已提交
31
#include "tref.h"
S
slguan 已提交
32
#include "trpc.h"
dengyihao's avatar
dengyihao 已提交
33 34 35
#include "ttimer.h"
#include "tutil.h"

dengyihao's avatar
dengyihao 已提交
36 37 38 39 40 41 42 43 44 45 46
static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT;

int tsRpcMaxUdpSize = 15000;  // bytes
int tsProgressTimer = 100;
// not configurable
int tsRpcMaxRetry;
int tsRpcHeadSize;
int tsRpcOverhead;

#ifndef USE_UV

dengyihao's avatar
dengyihao 已提交
47 48 49 50 51 52 53 54
typedef struct {
  int      sessions;      // number of sessions allowed
  int      numOfThreads;  // number of threads to process incoming messages
  int      idleTime;      // milliseconds;
  uint16_t localPort;
  int8_t   connType;
  int      index;  // for UDP server only, round robin for multiple threads
  char     label[TSDB_LABEL_LEN];
dengyihao's avatar
dengyihao 已提交
55

dengyihao's avatar
dengyihao 已提交
56 57 58 59 60
  char user[TSDB_UNI_LEN];         // meter ID
  char spi;                        // security parameter index
  char encrypt;                    // encrypt algorithm
  char secret[TSDB_PASSWORD_LEN];  // secret for the link
  char ckey[TSDB_PASSWORD_LEN];    // ciphering key
dengyihao's avatar
dengyihao 已提交
61

dengyihao's avatar
dengyihao 已提交
62 63
  void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
  int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
dengyihao's avatar
dengyihao 已提交
64

dengyihao's avatar
dengyihao 已提交
65
  int32_t          refCount;
dengyihao's avatar
dengyihao 已提交
66 67 68 69 70 71 72
  void *           parent;
  void *           idPool;     // handle to ID pool
  void *           tmrCtrl;    // handle to timer
  SHashObj *       hash;       // handle returned by hash utility
  void *           tcphandle;  // returned handle from TCP initialization
  void *           udphandle;  // returned handle from UDP initialization
  void *           pCache;     // connection cache
dengyihao's avatar
dengyihao 已提交
73
  pthread_mutex_t  mutex;
dengyihao's avatar
dengyihao 已提交
74
  struct SRpcConn *connList;  // connection list
dengyihao's avatar
dengyihao 已提交
75
} SRpcInfo;
dengyihao's avatar
dengyihao 已提交
76

dengyihao's avatar
dengyihao 已提交
77
typedef struct {
dengyihao's avatar
dengyihao 已提交
78
  SRpcInfo *       pRpc;      // associated SRpcInfo
dengyihao's avatar
dengyihao 已提交
79
  SEpSet           epSet;     // ip list provided by app
dengyihao's avatar
dengyihao 已提交
80 81
  void *           ahandle;   // handle provided by app
  struct SRpcConn *pConn;     // pConn allocated
dengyihao's avatar
dengyihao 已提交
82
  tmsg_t           msgType;   // message type
dengyihao's avatar
dengyihao 已提交
83
  uint8_t *        pCont;     // content provided by app
dengyihao's avatar
dengyihao 已提交
84 85 86 87 88 89 90
  int32_t          contLen;   // content length
  int32_t          code;      // error code
  int16_t          numOfTry;  // number of try for different servers
  int8_t           oldInUse;  // server EP inUse passed by app
  int8_t           redirect;  // flag to indicate redirect
  int8_t           connType;  // connection type
  int64_t          rid;       // refId returned by taosAddRef
dengyihao's avatar
dengyihao 已提交
91 92 93
  SRpcMsg *        pRsp;      // for synchronous API
  tsem_t *         pSem;      // for synchronous API
  SEpSet *         pSet;      // for synchronous API
dengyihao's avatar
dengyihao 已提交
94 95 96
  char             msg[0];    // RpcHead starts from here
} SRpcReqContext;

dengyihao's avatar
dengyihao 已提交
97
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
dengyihao's avatar
dengyihao 已提交
98
#define rpcHeadFromCont(cont) ((SRpcHead *)((char *)cont - sizeof(SRpcHead)))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
99 100 101
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102
#define rpcIsReq(type) (type & 1U)
H
hzcheng 已提交
103

J
Jeff Tao 已提交
104
typedef struct SRpcConn {
dengyihao's avatar
dengyihao 已提交
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
  char            info[48];                   // debug info: label + pConn + ahandle
  int             sid;                        // session ID
  uint32_t        ownId;                      // own link ID
  uint32_t        peerId;                     // peer link ID
  char            user[TSDB_UNI_LEN];         // user ID for the link
  char            spi;                        // security parameter index
  char            encrypt;                    // encryption, 0:1
  char            secret[TSDB_PASSWORD_LEN];  // secret for the link
  char            ckey[TSDB_PASSWORD_LEN];    // ciphering key
  char            secured;                    // if set to 1, no authentication
  uint16_t        localPort;                  // for UDP only
  uint32_t        linkUid;                    // connection unique ID assigned by client
  uint32_t        peerIp;                     // peer IP
  uint16_t        peerPort;                   // peer port
  char            peerFqdn[TSDB_FQDN_LEN];    // peer FQDN or ip string
  uint16_t        tranId;                     // outgoing transcation ID, for build message
  uint16_t        outTranId;                  // outgoing transcation ID
  uint16_t        inTranId;                   // transcation ID for incoming msg
  tmsg_t          outType;                    // message type for outgoing request
  tmsg_t          inType;                     // message type for incoming request
  void *          chandle;                    // handle passed by TCP/UDP connection layer
  void *          ahandle;                    // handle provided by upper app layter
  int             retry;                      // number of retry for sending request
  int             tretry;                     // total retry
  void *          pTimer;                     // retry timer to monitor the response
  void *          pIdleTimer;                 // idle timer
  char *          pRspMsg;                    // response message including header
  int             rspMsgLen;                  // response messag length
  char *          pReqMsg;                    // request message including header
  int             reqMsgLen;                  // request message length
  SRpcInfo *      pRpc;                       // the associated SRpcInfo
  int8_t          connType;                   // connection type
  int64_t         lockedBy;                   // lock for connection
  SRpcReqContext *pContext;                   // request context
H
hzcheng 已提交
139 140
} SRpcConn;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
141 142
static int     tsRpcRefId = -1;
static int32_t tsRpcNum = 0;
dengyihao's avatar
dengyihao 已提交
143
// static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144

145
// server:0 client:1  tcp:2 udp:0
dengyihao's avatar
dengyihao 已提交
146 147 148 149
#define RPC_CONN_UDPS 0
#define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3
150

J
jtao1735 已提交
151
void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
dengyihao's avatar
dengyihao 已提交
152
    taosInitUdpConnection, taosInitUdpConnection, taosInitTcpServer, taosInitTcpClient};
H
hzcheng 已提交
153

dengyihao's avatar
dengyihao 已提交
154 155
void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer,
                                            taosCleanUpTcpClient};
H
hzcheng 已提交
156

157
void (*taosStopConn[])(void *thandle) = {
dengyihao's avatar
dengyihao 已提交
158 159
    taosStopUdpConnection,
    taosStopUdpConnection,
160
    taosStopTcpServer,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161
    taosStopTcpClient,
162 163
};

164
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
dengyihao's avatar
dengyihao 已提交
165
    taosSendUdpData, taosSendUdpData, taosSendTcpData, taosSendTcpData};
H
hzcheng 已提交
166

J
jtao1735 已提交
167
void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = {
S
slguan 已提交
168 169 170 171
    taosOpenUdpConnection,
    taosOpenUdpConnection,
    NULL,
    taosOpenTcpClientConnection,
H
hzcheng 已提交
172 173
};

dengyihao's avatar
dengyihao 已提交
174
void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpConnection, taosCloseTcpConnection};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
175

J
jtao1735 已提交
176
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177
static void      rpcCloseConn(void *thandle);
178
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext);
179
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
180 181
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
H
hzcheng 已提交
182

dengyihao's avatar
dengyihao 已提交
183 184 185 186 187
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void rpcSendReqHead(SRpcConn *pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
188

189
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
S
slguan 已提交
190
static void  rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext);
191 192 193
static void  rpcProcessConnError(void *param, void *id);
static void  rpcProcessRetryTimer(void *, void *);
static void  rpcProcessIdleTimer(void *param, void *tmrId);
194
static void  rpcProcessProgressTimer(void *param, void *tmrId);
H
hzcheng 已提交
195

dengyihao's avatar
dengyihao 已提交
196 197
static void      rpcFreeMsg(void *msg);
static int32_t   rpcCompressRpcMsg(char *pCont, int32_t contLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
198
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
dengyihao's avatar
dengyihao 已提交
199 200 201 202 203 204
static int       rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
static int       rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
static void      rpcLockConn(SRpcConn *pConn);
static void      rpcUnlockConn(SRpcConn *pConn);
static void      rpcAddRef(SRpcInfo *pRpc);
static void      rpcDecRef(SRpcInfo *pRpc);
H
hzcheng 已提交
205

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
206 207 208 209 210
static void rpcFree(void *p) {
  tTrace("free mem: %p", p);
  free(p);
}

211 212 213 214
static void rpcInitImp(void) {
  tsProgressTimer = tsRpcTimer / 2;
  tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsProgressTimer;
  tsRpcHeadSize = RPC_MSG_OVERHEAD;
215
  tsRpcOverhead = sizeof(SRpcReqContext);
H
hzcheng 已提交
216

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
217
  tsRpcRefId = taosOpenRef(200, rpcFree);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218
}
219

220 221
int32_t rpcInit(void) {
  pthread_once(&tsRpcInitOnce, rpcInitImp);
222
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
223
}
224

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
225 226 227 228
void rpcCleanup(void) {
  taosCloseRef(tsRpcRefId);
  tsRpcRefId = -1;
}
dengyihao's avatar
dengyihao 已提交
229

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
230 231 232
void *rpcOpen(const SRpcInit *pInit) {
  SRpcInfo *pRpc;

dengyihao's avatar
dengyihao 已提交
233
  // pthread_once(&tsRpcInit, rpcInit);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
234

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
235 236
  pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
  if (pRpc == NULL) return NULL;
H
hzcheng 已提交
237

dengyihao's avatar
dengyihao 已提交
238 239
  if (pInit->label) tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));

240
  pRpc->connType = pInit->connType;
Y
TD-3115  
yihaoDeng 已提交
241 242 243
  if (pRpc->connType == TAOS_CONN_CLIENT) {
    pRpc->numOfThreads = pInit->numOfThreads;
  } else {
dengyihao's avatar
dengyihao 已提交
244
    pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
Y
TD-3115  
yihaoDeng 已提交
245
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
246 247 248
  pRpc->idleTime = pInit->idleTime;
  pRpc->localPort = pInit->localPort;
  pRpc->afp = pInit->afp;
dengyihao's avatar
dengyihao 已提交
249
  pRpc->sessions = pInit->sessions + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250 251 252
  if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
  if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret));
  if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
253
  pRpc->spi = pInit->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
254
  pRpc->cfp = pInit->cfp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
255
  pRpc->afp = pInit->afp;
S
Shengliang Guan 已提交
256
  pRpc->parent = pInit->parent;
257
  pRpc->refCount = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
258

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
259 260
  atomic_add_fetch_32(&tsRpcNum, 1);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
261
  size_t size = sizeof(SRpcConn) * pRpc->sessions;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
262 263
  pRpc->connList = (SRpcConn *)calloc(1, size);
  if (pRpc->connList == NULL) {
S
TD-1530  
Shengliang Guan 已提交
264
    tError("%s failed to allocate memory for taos connections, size:%" PRId64, pRpc->label, (int64_t)size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
265
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
266 267
    return NULL;
  }
H
hzcheng 已提交
268

dengyihao's avatar
dengyihao 已提交
269
  pRpc->idPool = taosInitIdPool(pRpc->sessions - 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
270 271
  if (pRpc->idPool == NULL) {
    tError("%s failed to init ID pool", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
272
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
273
    return NULL;
H
hzcheng 已提交
274
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
275

dengyihao's avatar
dengyihao 已提交
276
  pRpc->tmrCtrl = taosTmrInit(pRpc->sessions * 2 + 1, 50, 10000, pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
277 278
  if (pRpc->tmrCtrl == NULL) {
    tError("%s failed to init timers", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
279
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
280 281
    return NULL;
  }
H
hzcheng 已提交
282

283
  if (pRpc->connType == TAOS_CONN_SERVER) {
H
Haojun Liao 已提交
284
    pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true);
285 286 287 288 289 290
    if (pRpc->hash == NULL) {
      tError("%s failed to init string hash", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
  } else {
dengyihao's avatar
dengyihao 已提交
291 292
    pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20);
    if (pRpc->pCache == NULL) {
293 294 295 296
      tError("%s failed to init connection cache", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
297
  }
H
hzcheng 已提交
298

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
299 300
  pthread_mutex_init(&pRpc->mutex, NULL);

dengyihao's avatar
dengyihao 已提交
301 302 303 304
  pRpc->tcphandle = (*taosInitConn[pRpc->connType | RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads,
                                                                   rpcProcessMsgFromPeer, pRpc);
  pRpc->udphandle =
      (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
305 306 307 308 309 310 311

  if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) {
    tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort);
    rpcClose(pRpc);
    return NULL;
  }

S
TD-1843  
Shengliang Guan 已提交
312
  tDebug("%s rpc is opened, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions);
H
hzcheng 已提交
313

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
314
  return pRpc;
H
hzcheng 已提交
315 316
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
317 318
void rpcClose(void *param) {
  SRpcInfo *pRpc = (SRpcInfo *)param;
H
hzcheng 已提交
319

320
  // stop connection to outside first
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
321 322
  (*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
  (*taosStopConn[pRpc->connType])(pRpc->udphandle);
323

dengyihao's avatar
dengyihao 已提交
324
  // close all connections
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
325
  for (int i = 0; i < pRpc->sessions; ++i) {
326
    if (pRpc->connList && pRpc->connList[i].user[0]) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
327
      rpcCloseConn((void *)(pRpc->connList + i));
H
hzcheng 已提交
328 329 330
    }
  }

331
  // clean up
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
332 333
  (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
  (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
J
Jeff Tao 已提交
334

335
  tDebug("%s rpc is closed", pRpc->label);
336
  rpcDecRef(pRpc);
H
hzcheng 已提交
337 338
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
339 340
void *rpcMallocCont(int contLen) {
  int size = contLen + RPC_MSG_OVERHEAD;
341

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
342 343
  char *start = (char *)calloc(1, (size_t)size);
  if (start == NULL) {
344 345
    tError("failed to malloc msg, size:%d", size);
    return NULL;
346
  } else {
S
TD-1762  
Shengliang Guan 已提交
347
    tTrace("malloc mem:%p size:%d", start, size);
348 349
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
350
  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
351 352 353
}

void rpcFreeCont(void *cont) {
S
Shengliang Guan 已提交
354
  if (cont) {
355 356
    char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext);
    free(temp);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
357
    tTrace("free mem: %p", temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
358
  }
359 360
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
361 362 363 364
void *rpcReallocCont(void *ptr, int contLen) {
  if (ptr == NULL) return rpcMallocCont(contLen);

  char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead);
dengyihao's avatar
dengyihao 已提交
365 366
  if (contLen == 0) {
    free(start);
J
Jeff Tao 已提交
367
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
368 369 370 371 372 373 374
  }

  int size = contLen + RPC_MSG_OVERHEAD;
  start = realloc(start, size);
  if (start == NULL) {
    tError("failed to realloc cont, size:%d", size);
    return NULL;
dengyihao's avatar
dengyihao 已提交
375
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
376 377 378 379

  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}

S
Shengliang Guan 已提交
380
void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
dengyihao's avatar
dengyihao 已提交
381
  SRpcInfo *      pRpc = (SRpcInfo *)shandle;
382 383
  SRpcReqContext *pContext;

J
Jeff Tao 已提交
384
  int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
dengyihao's avatar
dengyihao 已提交
385
  pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
386
  pContext->ahandle = pMsg->ahandle;
387
  pContext->pRpc = (SRpcInfo *)shandle;
388
  pContext->epSet = *pEpSet;
J
Jeff Tao 已提交
389
  pContext->contLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
390 391
  pContext->pCont = pMsg->pCont;
  pContext->msgType = pMsg->msgType;
392
  pContext->oldInUse = pEpSet->inUse;
393

dengyihao's avatar
dengyihao 已提交
394 395
  pContext->connType = RPC_CONN_UDPC;
  if (contLen > tsRpcMaxUdpSize || tsRpcForceTcp) pContext->connType = RPC_CONN_TCPC;
396

dengyihao's avatar
dengyihao 已提交
397
  // connection type is application specific.
398
  // for TDengine, all the query, show commands shall have TCP connection
399
  tmsg_t type = pMsg->msgType;
dengyihao's avatar
dengyihao 已提交
400 401 402
  if (type == TDMT_VND_QUERY || type == TDMT_MND_SHOW_RETRIEVE || type == TDMT_VND_FETCH ||
      type == TDMT_MND_VGROUP_LIST || type == TDMT_VND_TABLES_META || type == TDMT_VND_TABLE_META ||
      type == TDMT_MND_SHOW || type == TDMT_MND_STATUS || type == TDMT_VND_ALTER_TABLE)
403
    pContext->connType = RPC_CONN_TCPC;
张宏权 已提交
404

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
405
  pContext->rid = taosAddRef(tsRpcRefId, pContext);
406
  if (pRid) *pRid = pContext->rid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
407

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
408
  rpcSendReqToServer(pRpc, pContext);
409 410
}

J
Jeff Tao 已提交
411
void rpcSendResponse(const SRpcMsg *pRsp) {
412 413
  if (pRsp->handle == NULL) return;

dengyihao's avatar
dengyihao 已提交
414 415 416 417 418
  int       msgLen = 0;
  SRpcConn *pConn = (SRpcConn *)pRsp->handle;
  SRpcMsg   rpcMsg = *pRsp;
  SRpcMsg * pMsg = &rpcMsg;
  SRpcInfo *pRpc = pConn->pRpc;
J
Jeff Tao 已提交
419

dengyihao's avatar
dengyihao 已提交
420
  if (pMsg->pCont == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
421 422
    pMsg->pCont = rpcMallocCont(0);
    pMsg->contLen = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
423 424
  }

dengyihao's avatar
dengyihao 已提交
425 426
  SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont);
  char *    msg = (char *)pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
427

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
428 429
  pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
  msgLen = rpcMsgLenFromCont(pMsg->contLen);
430

431
  rpcLockConn(pConn);
432

dengyihao's avatar
dengyihao 已提交
433
  if (pConn->inType == 0 || pConn->user[0] == 0) {
H
Haojun Liao 已提交
434
    tError("%s, connection is already released, rsp wont be sent", pConn->info);
435
    rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
436 437
    rpcFreeCont(pMsg->pCont);
    rpcDecRef(pRpc);
438 439 440
    return;
  }

441
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
442
  pHead->version = 1;
dengyihao's avatar
dengyihao 已提交
443
  pHead->msgType = pConn->inType + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
444 445
  pHead->spi = pConn->spi;
  pHead->encrypt = pConn->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
446 447 448
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
449
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
450
  pHead->port = htons(pConn->localPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
451
  pHead->code = htonl(pMsg->code);
dengyihao's avatar
dengyihao 已提交
452 453
  pHead->ahandle = (uint64_t)pConn->ahandle;

454 455
  // set pConn parameters
  pConn->inType = 0;
456 457

  // response message is released until new response is sent
dengyihao's avatar
dengyihao 已提交
458
  rpcFreeMsg(pConn->pRspMsg);
459 460
  pConn->pRspMsg = msg;
  pConn->rspMsgLen = msgLen;
461
  if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--;
462

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
463
  // stop the progress timer
464
  taosTmrStopA(&pConn->pTimer);
465 466

  // set the idle timer to monitor the activity
Y
yihaoDeng 已提交
467
  taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime * 30, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
468
  rpcSendMsgToPeer(pConn, msg, msgLen);
469 470

  // if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured
dengyihao's avatar
dengyihao 已提交
471
  if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY) pConn->secured = 1;  // connection shall be secured
472 473

  if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
474 475
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
476

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
477
  rpcUnlockConn(pConn);
dengyihao's avatar
dengyihao 已提交
478
  rpcDecRef(pRpc);  // decrease the referene count
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
479

480 481 482
  return;
}

S
Shengliang Guan 已提交
483
void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) {
dengyihao's avatar
dengyihao 已提交
484
  SRpcMsg rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
485
  memset(&rpcMsg, 0, sizeof(rpcMsg));
dengyihao's avatar
dengyihao 已提交
486

S
Shengliang Guan 已提交
487
  rpcMsg.contLen = sizeof(SEpSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
488 489
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  if (rpcMsg.pCont == NULL) return;
490

S
Shengliang Guan 已提交
491
  memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet));
492

493
  rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
494
  rpcMsg.handle = thandle;
495

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
496
  rpcSendResponse(&rpcMsg);
497 498 499 500

  return;
}

501
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
dengyihao's avatar
dengyihao 已提交
502
  SRpcConn *pConn = (SRpcConn *)thandle;
503
  if (pConn->user[0] == 0) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
504

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
505 506
  pInfo->clientIp = pConn->peerIp;
  pInfo->clientPort = pConn->peerPort;
J
jtao1735 已提交
507
  // pInfo->serverIp = pConn->destIp;
dengyihao's avatar
dengyihao 已提交
508

B
Bomin Zhang 已提交
509
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
510
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
511 512
}

S
Shengliang Guan 已提交
513
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
514
  SRpcReqContext *pContext;
dengyihao's avatar
dengyihao 已提交
515
  pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
516 517

  memset(pRsp, 0, sizeof(SRpcMsg));
dengyihao's avatar
dengyihao 已提交
518 519

  tsem_t sem;
520 521 522
  tsem_init(&sem, 0, 0);
  pContext->pSem = &sem;
  pContext->pRsp = pRsp;
523
  pContext->pSet = pEpSet;
524

525
  rpcSendRequest(shandle, pEpSet, pMsg, NULL);
526 527 528 529 530 531 532

  tsem_wait(&sem);
  tsem_destroy(&sem);

  return;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
533
// this API is used by server app to keep an APP context in case connection is broken
534
int rpcReportProgress(void *handle, char *pCont, int contLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
535
  SRpcConn *pConn = (SRpcConn *)handle;
dengyihao's avatar
dengyihao 已提交
536
  int       code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
537

538
  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
539

540 541
  if (pConn->user[0]) {
    // pReqMsg and reqMsgLen is re-used to store the context from app server
dengyihao's avatar
dengyihao 已提交
542
    pConn->pReqMsg = pCont;
543
    pConn->reqMsgLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
544
  } else {
545
    tDebug("%s, rpc connection is already released", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
546 547 548
    rpcFreeCont(pCont);
    code = -1;
  }
549

550
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
551
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
552 553
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
554 555 556
void rpcCancelRequest(int64_t rid) {
  SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid);
  if (pContext == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
557

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
558
  rpcCloseConn(pContext->pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
559

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
560
  taosReleaseRef(tsRpcRefId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
561 562
}

563
static void rpcFreeMsg(void *msg) {
dengyihao's avatar
dengyihao 已提交
564
  if (msg) {
565 566
    char *temp = (char *)msg - sizeof(SRpcReqContext);
    free(temp);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
567
    tTrace("free mem: %p", temp);
568 569 570
  }
}

J
jtao1735 已提交
571
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
572
  SRpcConn *pConn;
S
slguan 已提交
573

574
  uint32_t peerIp = taosGetIpv4FromFqdn(peerFqdn);
575
  if (peerIp == 0xFFFFFFFF) {
dengyihao's avatar
dengyihao 已提交
576 577
    tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn);
    terrno = TSDB_CODE_RPC_FQDN_ERROR;
J
jtao1735 已提交
578 579 580
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
581
  pConn = rpcAllocateClientConn(pRpc);
H
hzcheng 已提交
582

dengyihao's avatar
dengyihao 已提交
583
  if (pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
584
    tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn));
J
jtao1735 已提交
585
    pConn->peerIp = peerIp;
586
    pConn->peerPort = peerPort;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
587
    tstrncpy(pConn->user, pRpc->user, sizeof(pConn->user));
588
    pConn->connType = connType;
589

590
    if (taosOpenConn[connType]) {
dengyihao's avatar
dengyihao 已提交
591
      void *shandle = (connType & RPC_CONN_TCP) ? pRpc->tcphandle : pRpc->udphandle;
J
jtao1735 已提交
592
      pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
593
      if (pConn->chandle == NULL) {
H
Haojun Liao 已提交
594
        tError("failed to connect to:%s:%d", taosIpStr(pConn->peerIp), pConn->peerPort);
595

596
        terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
597 598 599
        rpcCloseConn(pConn);
        pConn = NULL;
      }
H
hzcheng 已提交
600 601 602
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
603
  return pConn;
H
hzcheng 已提交
604 605
}

606
static void rpcReleaseConn(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
607
  SRpcInfo *pRpc = pConn->pRpc;
608
  if (pConn->user[0] == 0) return;
H
hzcheng 已提交
609

610 611
  pConn->user[0] = 0;
  if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle);
H
hzcheng 已提交
612

613 614 615
  taosTmrStopA(&pConn->pTimer);
  taosTmrStopA(&pConn->pIdleTimer);

dengyihao's avatar
dengyihao 已提交
616 617 618 619
  if (pRpc->connType == TAOS_CONN_SERVER) {
    char   hashstr[40] = {0};
    size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId,
                           pConn->connType);
J
jtao1735 已提交
620
    taosHashRemove(pRpc->hash, hashstr, size);
dengyihao's avatar
dengyihao 已提交
621
    rpcFreeMsg(pConn->pRspMsg);  // it may have a response msg saved, but not request msg
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
622
    pConn->pRspMsg = NULL;
dengyihao's avatar
dengyihao 已提交
623

624 625
    // if server has ever reported progress, free content
    if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);  // do not use rpcFreeMsg
626
  } else {
627
    // if there is an outgoing message, free it
陶建辉(Jeff)'s avatar
TD-1669  
陶建辉(Jeff) 已提交
628
    if (pConn->outType && pConn->pReqMsg) {
629
      SRpcReqContext *pContext = pConn->pContext;
Y
yihaoDeng 已提交
630
      if (pContext) {
dengyihao's avatar
dengyihao 已提交
631 632
        if (pContext->pRsp) {
          // for synchronous API, post semaphore to unblock app
Y
yihaoDeng 已提交
633 634 635 636 637
          pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR;
          pContext->pRsp->pCont = NULL;
          pContext->pRsp->contLen = 0;
          tsem_post(pContext->pSem);
        }
dengyihao's avatar
dengyihao 已提交
638
        pContext->pConn = NULL;
Y
yihaoDeng 已提交
639 640
        taosRemoveRef(tsRpcRefId, pContext->rid);
      } else {
dengyihao's avatar
dengyihao 已提交
641
        assert(0);
642
      }
陶建辉(Jeff)'s avatar
TD-1669  
陶建辉(Jeff) 已提交
643
    }
644
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
645

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
646 647 648 649 650 651
  // memset could not be used, since lockeBy can not be reset
  pConn->inType = 0;
  pConn->outType = 0;
  pConn->inTranId = 0;
  pConn->outTranId = 0;
  pConn->secured = 0;
652
  pConn->peerId = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
653 654
  pConn->peerIp = 0;
  pConn->peerPort = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
655 656 657
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
  pConn->pContext = NULL;
658
  pConn->chandle = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
659 660

  taosFreeId(pRpc->idPool, pConn->sid);
661
  tDebug("%s, rpc connection is released", pConn->info);
662 663 664 665
}

static void rpcCloseConn(void *thandle) {
  SRpcConn *pConn = (SRpcConn *)thandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
666
  if (pConn == NULL) return;
667 668 669

  rpcLockConn(pConn);

dengyihao's avatar
dengyihao 已提交
670
  if (pConn->user[0]) rpcReleaseConn(pConn);
671

672
  rpcUnlockConn(pConn);
H
hzcheng 已提交
673 674
}

675 676
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
677

678 679 680
  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
681
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
682
  } else {
683 684 685 686
    pConn = pRpc->connList + sid;

    pConn->pRpc = pRpc;
    pConn->sid = sid;
687
    pConn->tranId = (uint16_t)(taosRand() & 0xFFFF);
688
    pConn->ownId = htonl(pConn->sid);
689
    pConn->linkUid = (uint32_t)((int64_t)pConn + taosGetPId() + (int64_t)pConn->tranId);
690 691
    pConn->spi = pRpc->spi;
    pConn->encrypt = pRpc->encrypt;
692
    if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_PASSWORD_LEN);
693
    tDebug("%s %p client connection is allocated, uid:0x%x", pRpc->label, pConn, pConn->linkUid);
694
  }
H
hzcheng 已提交
695

696 697
  return pConn;
}
H
hzcheng 已提交
698

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
699
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
700
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
701
  char      hashstr[40] = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
702 703
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;

dengyihao's avatar
dengyihao 已提交
704 705 706
  size_t size =
      snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);

707
  // check if it is already allocated
J
jtao1735 已提交
708
  SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
709
  if (ppConn) pConn = *ppConn;
710 711 712 713
  if (pConn) {
    pConn->secured = 0;
    return pConn;
  }
714

715 716 717 718 719 720
  // if code is not 0, it means it is simple reqhead, just ignore
  if (pHead->code != 0) {
    terrno = TSDB_CODE_RPC_ALREADY_PROCESSED;
    return NULL;
  }

721 722 723
  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
724
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
725 726
  } else {
    pConn = pRpc->connList + sid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
727
    memcpy(pConn->user, pHead->user, tListLen(pConn->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
728
    pConn->pRpc = pRpc;
H
hzcheng 已提交
729 730
    pConn->sid = sid;
    pConn->tranId = (uint16_t)(rand() & 0xFFFF);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
731
    pConn->ownId = htonl(pConn->sid);
732
    pConn->linkUid = pHead->linkUid;
733
    if (pRpc->afp) {
734
      if (pConn->user[0] == 0) {
735
        terrno = TSDB_CODE_RPC_AUTH_REQUIRED;
736
      } else {
S
Shengliang Guan 已提交
737
        terrno = (*pRpc->afp)(pRpc->parent, pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
738 739
      }

740
      if (terrno != 0) {
S
Shengliang Guan 已提交
741
        taosFreeId(pRpc->idPool, sid);  // sid shall be released
742 743
        pConn = NULL;
      }
H
hzcheng 已提交
744
    }
S
Shengliang Guan 已提交
745
  }
H
hzcheng 已提交
746

747
  if (pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
748 749
    if (pRecv->connType == RPC_CONN_UDPS && pRpc->numOfThreads > 1) {
      // UDP server, assign to new connection
S
Shengliang Guan 已提交
750
      pRpc->index = (pRpc->index + 1) % pRpc->numOfThreads;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
751 752
      pConn->localPort = (pRpc->localPort + pRpc->index);
    }
S
Shengliang Guan 已提交
753

J
jtao1735 已提交
754
    taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
755 756
    tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s", pRpc->label, pConn, pConn->linkUid, sid,
           hashstr);
757 758 759 760 761
  }

  return pConn;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
762
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
dengyihao's avatar
dengyihao 已提交
763
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
764
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
765 766 767

  if (sid) {
    pConn = pRpc->connList + sid;
768
    if (pConn->user[0] == 0) pConn = NULL;
dengyihao's avatar
dengyihao 已提交
769
  }
770

dengyihao's avatar
dengyihao 已提交
771
  if (pConn == NULL) {
772 773 774
    if (pRpc->connType == TAOS_CONN_SERVER) {
      pConn = rpcAllocateServerConn(pRpc, pRecv);
    } else {
775
      terrno = TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
776 777
    }
  }
778

779
  if (pConn) {
780
    if (pConn->linkUid != pHead->linkUid) {
781
      terrno = TSDB_CODE_RPC_MISMATCHED_LINK_ID;
S
Shengliang Guan 已提交
782 783
      tDebug("%s %p %p, linkUid:0x%x is not matched with received:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
             pConn->linkUid, pHead->linkUid);
784
      pConn = NULL;
H
hzcheng 已提交
785 786 787
    }
  }

788
  return pConn;
H
hzcheng 已提交
789 790
}

791
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
S
Shengliang Guan 已提交
792 793
  SRpcConn *pConn;
  SRpcInfo *pRpc = pContext->pRpc;
dengyihao's avatar
dengyihao 已提交
794
  SEpSet *  pEpSet = &pContext->epSet;
H
hzcheng 已提交
795

dengyihao's avatar
dengyihao 已提交
796 797 798
  pConn =
      rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
  if (pConn == NULL || pConn->user[0] == 0) {
799
    pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
dengyihao's avatar
dengyihao 已提交
800
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
801 802

  if (pConn) {
S
Shengliang Guan 已提交
803
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
804
    pConn->ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
805
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
806
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
807
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
808
    tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
809
  }
H
hzcheng 已提交
810

811
  return pConn;
H
hzcheng 已提交
812 813
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
814
static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
dengyihao's avatar
dengyihao 已提交
815 816 817 818 819 820
  if (pConn->peerId == 0) {
    pConn->peerId = pHead->sourceId;
  } else {
    if (pConn->peerId != pHead->sourceId) {
      tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, pConn->peerId, pHead->sourceId);
      return TSDB_CODE_RPC_INVALID_VALUE;
H
hzcheng 已提交
821
    }
dengyihao's avatar
dengyihao 已提交
822
  }
H
hzcheng 已提交
823

dengyihao's avatar
dengyihao 已提交
824 825 826 827 828
  if (pConn->inTranId == pHead->tranId) {
    if (pConn->inType == pHead->msgType) {
      if (pHead->code == 0) {
        tDebug("%s, %s is retransmitted", pConn->info, TMSG_INFO(pHead->msgType));
        rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
829
      } else {
dengyihao's avatar
dengyihao 已提交
830
        // do nothing, it is heart beat from client
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
831
      }
dengyihao's avatar
dengyihao 已提交
832 833 834 835 836
    } else if (pConn->inType == 0) {
      tDebug("%s, %s is already processed, tranId:%d", pConn->info, TMSG_INFO(pHead->msgType), pConn->inTranId);
      rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen);  // resend the response
    } else {
      tDebug("%s, mismatched message %s and tranId", pConn->info, TMSG_INFO(pHead->msgType));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
837
    }
H
hzcheng 已提交
838

dengyihao's avatar
dengyihao 已提交
839 840 841
    // do not reply any message
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
  }
H
hzcheng 已提交
842

dengyihao's avatar
dengyihao 已提交
843 844 845 846
  if (pConn->inType != 0) {
    tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, pConn->inTranId, pHead->tranId);
    return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
  }
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
847

dengyihao's avatar
dengyihao 已提交
848 849 850 851
  if (rpcContLenFromMsg(pHead->msgLen) <= 0) {
    tDebug("%s, message body is empty, ignore", pConn->info);
    return TSDB_CODE_RPC_APP_ERROR;
  }
H
hzcheng 已提交
852

dengyihao's avatar
dengyihao 已提交
853 854 855 856 857 858 859 860
  pConn->inTranId = pHead->tranId;
  pConn->inType = pHead->msgType;

  // start the progress timer to monitor the response from server app
  if (pConn->connType != RPC_CONN_TCPS)
    pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl);

  return 0;
H
hzcheng 已提交
861 862
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
863
static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
864
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
865
  pConn->peerId = pHead->sourceId;
H
hzcheng 已提交
866

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
867
  if (pConn->outType == 0 || pConn->pContext == NULL) {
868
    return TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
869
  }
H
hzcheng 已提交
870

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
871
  if (pHead->tranId != pConn->outTranId) {
872
    return TSDB_CODE_RPC_INVALID_TRAN_ID;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
873
  }
H
hzcheng 已提交
874

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
875
  if (pHead->msgType != pConn->outType + 1) {
876
    return TSDB_CODE_RPC_INVALID_RESPONSE_TYPE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
877
  }
H
hzcheng 已提交
878

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
879 880 881
  taosTmrStopA(&pConn->pTimer);
  pConn->retry = 0;

882
  if (pHead->code == TSDB_CODE_RPC_AUTH_REQUIRED && pRpc->spi) {
883
    tDebug("%s, authentication shall be restarted", pConn->info);
884
    pConn->secured = 0;
dengyihao's avatar
dengyihao 已提交
885
    rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
886 887
    if (pConn->connType != RPC_CONN_TCPC)
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
888
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
889 890
  }

891 892 893 894
  if (pHead->code == TSDB_CODE_RPC_MISMATCHED_LINK_ID) {
    tDebug("%s, mismatched linkUid, link shall be restarted", pConn->info);
    pConn->secured = 0;
    ((SRpcHead *)pConn->pReqMsg)->destId = 0;
dengyihao's avatar
dengyihao 已提交
895
    rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
896 897 898 899 900
    if (pConn->connType != RPC_CONN_TCPC)
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
  }

901
  if (pHead->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
902
    if (pConn->tretry <= tsRpcMaxRetry) {
903
      tDebug("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
904 905
      pConn->tretry++;
      rpcSendReqHead(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
906 907
      if (pConn->connType != RPC_CONN_TCPC)
        pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
908
      return TSDB_CODE_RPC_ALREADY_PROCESSED;
H
hzcheng 已提交
909
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
910
      // peer still in processing, give up
911
      tDebug("%s, server processing takes too long time, give up", pConn->info);
912
      pHead->code = TSDB_CODE_RPC_TOO_SLOW;
H
hzcheng 已提交
913 914
    }
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
915

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
916 917
  pConn->outType = 0;
  pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
918
  pConn->reqMsgLen = 0;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
919 920
  SRpcReqContext *pContext = pConn->pContext;

dengyihao's avatar
dengyihao 已提交
921
  if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
S
Shengliang Guan 已提交
922
    if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SEpSet)) {
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
923
      // if EpSet is not included in the msg, treat it as NOT_READY
dengyihao's avatar
dengyihao 已提交
924
      pHead->code = TSDB_CODE_RPC_NOT_READY;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
925 926 927
    } else {
      pContext->redirect++;
      if (pContext->redirect > TSDB_MAX_REPLICA) {
dengyihao's avatar
dengyihao 已提交
928
        pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
929 930 931
        tWarn("%s, too many redirects, quit", pConn->info);
      }
    }
dengyihao's avatar
dengyihao 已提交
932
  }
933 934

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
935 936
}

S
slguan 已提交
937
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) {
dengyihao's avatar
dengyihao 已提交
938 939
  int32_t   sid;
  SRpcConn *pConn = NULL;
H
hzcheng 已提交
940

941
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
942

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
943
  sid = htonl(pHead->destId);
S
slguan 已提交
944
  *ppContext = NULL;
H
hzcheng 已提交
945

946
  if (TMSG_INDEX(pHead->msgType) >= TDMT_MAX || TMSG_INDEX(pHead->msgType) <= 0) {
947
    tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
dengyihao's avatar
dengyihao 已提交
948 949
    terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE;
    return NULL;
H
hzcheng 已提交
950 951
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
952
  if (sid < 0 || sid >= pRpc->sessions) {
dengyihao's avatar
dengyihao 已提交
953 954 955 956
    tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, pRpc->sessions,
           TMSG_INFO(pHead->msgType));
    terrno = TSDB_CODE_RPC_INVALID_SESSION_ID;
    return NULL;
H
hzcheng 已提交
957 958
  }

959
  if (rpcIsReq(pHead->msgType) && htonl(pHead->msgVer) != tsVersion >> 8) {
dengyihao's avatar
dengyihao 已提交
960 961 962 963
    tDebug("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion,
           TMSG_INFO(pHead->msgType));
    terrno = TSDB_CODE_RPC_INVALID_VERSION;
    return NULL;
964 965
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
966
  pConn = rpcGetConnObj(pRpc, sid, pRecv);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
967
  if (pConn == NULL) {
dengyihao's avatar
dengyihao 已提交
968
    tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
969
    return NULL;
dengyihao's avatar
dengyihao 已提交
970
  }
H
hzcheng 已提交
971

972
  rpcLockConn(pConn);
H
hzcheng 已提交
973

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
974 975
  if (rpcIsReq(pHead->msgType)) {
    pConn->ahandle = (void *)pHead->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
976
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
977 978 979
  }

  sid = pConn->sid;
980
  if (pConn->chandle == NULL) pConn->chandle = pRecv->chandle;
dengyihao's avatar
dengyihao 已提交
981
  pConn->peerIp = pRecv->ip;
982
  pConn->peerPort = pRecv->port;
dengyihao's avatar
dengyihao 已提交
983
  if (pHead->port) pConn->peerPort = htons(pHead->port);
H
hzcheng 已提交
984

985
  terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
986 987 988 989

  // code can be transformed only after authentication
  pHead->code = htonl(pHead->code);

990
  if (terrno == 0) {
J
jtao1735 已提交
991
    if (pHead->encrypt) {
992 993
      // decrypt here
    }
H
hzcheng 已提交
994

dengyihao's avatar
dengyihao 已提交
995
    if (rpcIsReq(pHead->msgType)) {
996
      pConn->connType = pRecv->connType;
Y
yihaoDeng 已提交
997
      terrno = rpcProcessReqHead(pConn, pHead);
998

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
999
      // stop idle timer
dengyihao's avatar
dengyihao 已提交
1000
      taosTmrStopA(&pConn->pIdleTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1001

dengyihao's avatar
dengyihao 已提交
1002
      // client shall send the request within tsRpcTime again for UDP, double it
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1003
      if (pConn->connType != RPC_CONN_TCPS)
dengyihao's avatar
dengyihao 已提交
1004
        pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer * 2, pConn, pRpc->tmrCtrl);
1005 1006
    } else {
      terrno = rpcProcessRspHead(pConn, pHead);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1007
      *ppContext = pConn->pContext;
1008
    }
H
hzcheng 已提交
1009 1010
  }

1011
  rpcUnlockConn(pConn);
H
hzcheng 已提交
1012

1013
  return pConn;
H
hzcheng 已提交
1014 1015
}

Y
TD-3409  
yihaoDeng 已提交
1016
static void doRpcReportBrokenLinkToServer(void *param, void *id) {
dengyihao's avatar
dengyihao 已提交
1017 1018 1019
  SRpcMsg * pRpcMsg = (SRpcMsg *)(param);
  SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle);
  SRpcInfo *pRpc = pConn->pRpc;
S
Shengliang Guan 已提交
1020 1021
  (*(pRpc->cfp))(pRpc->parent, pRpcMsg, NULL);
  free(pRpcMsg);
Y
TD-3409  
yihaoDeng 已提交
1022
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1023 1024
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1025
  if (pConn->pReqMsg == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1026 1027

  // if there are pending request, notify the app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1028
  rpcAddRef(pRpc);
1029
  tDebug("%s, notify the server app, connection is gone", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1030

Y
TD-3409  
yihaoDeng 已提交
1031
  SRpcMsg *rpcMsg = malloc(sizeof(SRpcMsg));
dengyihao's avatar
dengyihao 已提交
1032 1033
  rpcMsg->pCont = pConn->pReqMsg;      // pReqMsg is re-used to store the APP context from server
  rpcMsg->contLen = pConn->reqMsgLen;  // reqMsgLen is re-used to store the APP context length
Y
TD-3409  
yihaoDeng 已提交
1034 1035 1036
  rpcMsg->ahandle = pConn->ahandle;
  rpcMsg->handle = pConn;
  rpcMsg->msgType = pConn->inType;
dengyihao's avatar
dengyihao 已提交
1037
  rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1038 1039
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
Y
TD-3409  
yihaoDeng 已提交
1040 1041 1042 1043 1044
  if (pRpc->cfp) {
    taosTmrStart(doRpcReportBrokenLinkToServer, 0, rpcMsg, pRpc->tmrCtrl);
  } else {
    free(rpcMsg);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1045 1046
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1047
static void rpcProcessBrokenLink(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1048
  if (pConn == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1049
  SRpcInfo *pRpc = pConn->pRpc;
1050
  tDebug("%s, link is broken", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1051 1052

  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1053

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1054 1055
  if (pConn->outType) {
    SRpcReqContext *pContext = pConn->pContext;
1056
    pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1057
    pContext->pConn = NULL;
1058
    pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1059 1060
    taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
  }
dengyihao's avatar
dengyihao 已提交
1061 1062

  if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1063

1064
  rpcReleaseConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1065
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1066 1067
}

1068
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
dengyihao's avatar
dengyihao 已提交
1069 1070 1071
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
  SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle;
  SRpcConn *pConn = (SRpcConn *)pRecv->thandle;
H
hzcheng 已提交
1072

S
Shengliang Guan 已提交
1073
  tDump(pRecv->msg, pRecv->msgLen);
1074 1075

  // underlying UDP layer does not know it is server or client
dengyihao's avatar
dengyihao 已提交
1076
  pRecv->connType = pRecv->connType | pRpc->connType;
H
hzcheng 已提交
1077

B
Bomin Zhang 已提交
1078
  if (pRecv->msg == NULL) {
1079
    rpcProcessBrokenLink(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1080 1081 1082
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1083
  terrno = 0;
S
slguan 已提交
1084 1085
  SRpcReqContext *pContext;
  pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
H
hzcheng 已提交
1086

1087 1088 1089
  char ipstr[24] = {0};
  taosIpPort2String(pRecv->ip, pRecv->port, ipstr);

1090
  if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) {
dengyihao's avatar
dengyihao 已提交
1091 1092 1093
    tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn,
           (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen, pHead->sourceId,
           pHead->destId, pHead->tranId, pHead->code);
1094
  } else {
dengyihao's avatar
dengyihao 已提交
1095 1096 1097
    tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn,
           (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId,
           pHead->tranId, pHead->code);
1098
  }
H
hzcheng 已提交
1099

H
TD-34  
hzcheng 已提交
1100
  int32_t code = terrno;
1101
  if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) {
dengyihao's avatar
dengyihao 已提交
1102
    if (code != 0) {  // parsing error
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1103
      if (rpcIsReq(pHead->msgType)) {
H
TD-34  
hzcheng 已提交
1104
        rpcSendErrorMsgToPeer(pRecv, code);
B
Bomin Zhang 已提交
1105 1106 1107
        if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE) {
          rpcCloseConn(pConn);
        }
1108
        if (TMSG_INDEX(pHead->msgType) + 1 > 1 && TMSG_INDEX(pHead->msgType) + 1 < TDMT_MAX) {
dengyihao's avatar
dengyihao 已提交
1109 1110
          tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
                 TMSG_INFO(pHead->msgType + 1), code);
Y
yihaoDeng 已提交
1111
        } else {
dengyihao's avatar
dengyihao 已提交
1112 1113
          tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
                 TMSG_INFO(pHead->msgType), code);
1114
        }
dengyihao's avatar
dengyihao 已提交
1115 1116
      }
    } else {  // msg is passed to app only parsing is ok
S
slguan 已提交
1117
      rpcProcessIncomingMsg(pConn, pHead, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1118
    }
H
hzcheng 已提交
1119 1120
  }

dengyihao's avatar
dengyihao 已提交
1121
  if (code) rpcFreeMsg(pRecv->msg);  // parsing failed, msg shall be freed
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1122 1123
  return pConn;
}
H
hzcheng 已提交
1124

1125
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
dengyihao's avatar
dengyihao 已提交
1126
  SRpcInfo *pRpc = pContext->pRpc;
1127

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1128
  pContext->pConn = NULL;
dengyihao's avatar
dengyihao 已提交
1129
  if (pContext->pRsp) {
1130
    // for synchronous API
S
Shengliang Guan 已提交
1131
    memcpy(pContext->pSet, &pContext->epSet, sizeof(SEpSet));
1132
    memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1133
    tsem_post(pContext->pSem);
1134
  } else {
dengyihao's avatar
dengyihao 已提交
1135
    // for asynchronous API
S
Shengliang Guan 已提交
1136
    SEpSet *pEpSet = NULL;
dengyihao's avatar
dengyihao 已提交
1137
    if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) pEpSet = &pContext->epSet;
1138

S
Shengliang Guan 已提交
1139
    (*pRpc->cfp)(pRpc->parent, pMsg, pEpSet);
1140 1141 1142
  }

  // free the request message
dengyihao's avatar
dengyihao 已提交
1143
  taosRemoveRef(tsRpcRefId, pContext->rid);
1144 1145
}

S
slguan 已提交
1146
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1147
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1148
  SRpcMsg   rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1149

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1150
  pHead = rpcDecompressRpcMsg(pHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1151 1152 1153
  rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
  rpcMsg.pCont = pHead->content;
  rpcMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
1154 1155 1156
  rpcMsg.code = pHead->code;

  if (rpcIsReq(pHead->msgType)) {
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1157
    rpcMsg.ahandle = pConn->ahandle;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1158 1159
    rpcMsg.handle = pConn;
    rpcAddRef(pRpc);  // add the refCount for requests
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1160

陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1161
    // notify the server app
S
Shengliang Guan 已提交
1162
    (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
H
hzcheng 已提交
1163
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1164
    // it's a response
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1165
    rpcMsg.handle = pContext;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1166
    rpcMsg.ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1167
    pContext->pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1168

1169
    // for UDP, port may be changed by server, the port in epSet shall be used for cache
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1170
    if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
dengyihao's avatar
dengyihao 已提交
1171 1172
      rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse],
                          pConn->connType);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1173
    } else {
1174 1175
      rpcCloseConn(pConn);
    }
H
hzcheng 已提交
1176

1177
    if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1178
      pContext->numOfTry = 0;
dengyihao's avatar
dengyihao 已提交
1179
      SEpSet *pEpSet = (SEpSet *)pHead->content;
1180 1181
      if (pEpSet->numOfEps > 0) {
        memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
S
TD-1670  
Shengliang Guan 已提交
1182 1183 1184
        tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,
               pContext->epSet.inUse);
        for (int i = 0; i < pContext->epSet.numOfEps; ++i) {
1185
          pContext->epSet.port[i] = htons(pContext->epSet.port[i]);
S
TD-1670  
Shengliang Guan 已提交
1186 1187 1188
          tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.fqdn[i],
                 pContext->epSet.port[i]);
        }
1189
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1190
      rpcSendReqToServer(pRpc, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1191
      rpcFreeCont(rpcMsg.pCont);
dengyihao's avatar
dengyihao 已提交
1192 1193
    } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY ||
               pHead->code == TSDB_CODE_DND_OFFLINE) {
1194 1195
      pContext->code = pHead->code;
      rpcProcessConnError(pContext, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1196
      rpcFreeCont(rpcMsg.pCont);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1197
    } else {
1198
      rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1199 1200 1201 1202
    }
  }
}

1203
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
dengyihao's avatar
dengyihao 已提交
1204 1205
  char      msg[RPC_MSG_OVERHEAD];
  SRpcHead *pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1206

1207
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1208 1209 1210
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
dengyihao's avatar
dengyihao 已提交
1211
  pHead->msgType = pConn->inType + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1212
  pHead->spi = pConn->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1213 1214 1215 1216
  pHead->encrypt = 0;
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
1217
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1218
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1219
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1220
  pHead->code = htonl(code);
H
hzcheng 已提交
1221

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1222
  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
dengyihao's avatar
dengyihao 已提交
1223
  pConn->secured = 1;  // connection shall be secured
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1224 1225 1226
}

static void rpcSendReqHead(SRpcConn *pConn) {
dengyihao's avatar
dengyihao 已提交
1227 1228
  char      msg[RPC_MSG_OVERHEAD];
  SRpcHead *pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1229 1230 1231 1232 1233 1234

  // set msg header
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
  pHead->msgType = pConn->outType;
1235
  pHead->msgVer = htonl(tsVersion >> 8);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1236 1237 1238 1239 1240 1241
  pHead->spi = pConn->spi;
  pHead->encrypt = 0;
  pHead->tranId = pConn->outTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1242
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1243 1244 1245 1246
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
  pHead->code = 1;

  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1247
}
H
hjxilinx 已提交
1248

1249
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
dengyihao's avatar
dengyihao 已提交
1250 1251 1252 1253
  SRpcHead *pRecvHead, *pReplyHead;
  char      msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t)];
  uint32_t  timeStamp;
  int       msgLen;
H
hzcheng 已提交
1254

1255
  pRecvHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1256
  pReplyHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1257

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1258 1259
  memset(msg, 0, sizeof(SRpcHead));
  pReplyHead->version = pRecvHead->version;
1260
  pReplyHead->msgType = (tmsg_t)(pRecvHead->msgType + 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1261
  pReplyHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1262
  pReplyHead->encrypt = pRecvHead->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1263
  pReplyHead->tranId = pRecvHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1264
  pReplyHead->sourceId = pRecvHead->destId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1265
  pReplyHead->destId = pRecvHead->sourceId;
1266
  pReplyHead->linkUid = pRecvHead->linkUid;
1267
  pReplyHead->ahandle = pRecvHead->ahandle;
H
hzcheng 已提交
1268

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1269 1270
  pReplyHead->code = htonl(code);
  msgLen = sizeof(SRpcHead);
H
hzcheng 已提交
1271

1272
  if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP) {
1273
    // include a time stamp if client's time is not synchronized well
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1274
    uint8_t *pContent = pReplyHead->content;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1275
    timeStamp = htonl(taosGetTimestampSec());
1276 1277
    memcpy(pContent, &timeStamp, sizeof(timeStamp));
    msgLen += sizeof(timeStamp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1278
  }
H
hzcheng 已提交
1279

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1280
  pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1281
  (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle);
H
hzcheng 已提交
1282

dengyihao's avatar
dengyihao 已提交
1283
  return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1284 1285
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1286
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
dengyihao's avatar
dengyihao 已提交
1287 1288 1289 1290
  SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
  char *    msg = (char *)pHead;
  int       msgLen = rpcMsgLenFromCont(pContext->contLen);
  tmsg_t    msgType = pContext->msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1291

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1292
  pContext->numOfTry++;
1293
  SRpcConn *pConn = rpcSetupConnToServer(pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1294 1295
  if (pConn == NULL) {
    pContext->code = terrno;
1296
    taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1297 1298 1299
    return;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1300
  pContext->pConn = pConn;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1301
  pConn->ahandle = pContext->ahandle;
1302
  rpcLockConn(pConn);
1303

dengyihao's avatar
dengyihao 已提交
1304
  // set the message header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1305
  pHead->version = 1;
1306
  pHead->msgVer = htonl(tsVersion >> 8);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1307 1308
  pHead->msgType = msgType;
  pHead->encrypt = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1309
  pConn->tranId++;
dengyihao's avatar
dengyihao 已提交
1310
  if (pConn->tranId == 0) pConn->tranId++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1311 1312 1313 1314
  pHead->tranId = pConn->tranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->port = 0;
1315
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1316
  pHead->ahandle = (uint64_t)pConn->ahandle;
1317
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1318 1319 1320

  // set the connection parameters
  pConn->outType = msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1321
  pConn->outTranId = pHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1322 1323
  pConn->pReqMsg = msg;
  pConn->reqMsgLen = msgLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1324
  pConn->pContext = pContext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1325

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1326
  rpcSendMsgToPeer(pConn, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1327 1328
  if (pConn->connType != RPC_CONN_TCPC)
    taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1329 1330

  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1331
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1332 1333

static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
dengyihao's avatar
dengyihao 已提交
1334 1335
  int       writtenLen = 0;
  SRpcHead *pHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1336

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1337
  msgLen = rpcAddAuthPart(pConn, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1338

dengyihao's avatar
dengyihao 已提交
1339 1340 1341
  if (rpcIsReq(pHead->msgType)) {
    tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType),
           pConn->peerFqdn, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1342
  } else {
dengyihao's avatar
dengyihao 已提交
1343 1344 1345
    if (pHead->code == 0) pConn->secured = 1;  // for success response, set link as secured
    tDebug("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType),
           pConn->peerIp, pConn->peerPort, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1346
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1347

dengyihao's avatar
dengyihao 已提交
1348
  // tTrace("connection type is: %d", pConn->connType);
1349
  writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1350

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1351
  if (writtenLen != msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1352
    tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
1353
  }
dengyihao's avatar
dengyihao 已提交
1354

S
Shengliang Guan 已提交
1355
  tDump(msg, msgLen);
H
hzcheng 已提交
1356 1357
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1358
static void rpcProcessConnError(void *param, void *id) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1359
  SRpcReqContext *pContext = (SRpcReqContext *)param;
dengyihao's avatar
dengyihao 已提交
1360
  SRpcInfo *      pRpc = pContext->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1361
  SRpcMsg         rpcMsg;
dengyihao's avatar
dengyihao 已提交
1362

H
hjxilinx 已提交
1363 1364 1365
  if (pRpc == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1366

1367
  tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);
H
hzcheng 已提交
1368

H
Hongze Cheng 已提交
1369
  if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TDMT_VND_FETCH) {
dengyihao's avatar
dengyihao 已提交
1370
    rpcMsg.msgType = pContext->msgType + 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1371
    rpcMsg.ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1372 1373 1374
    rpcMsg.code = pContext->code;
    rpcMsg.pCont = NULL;
    rpcMsg.contLen = 0;
1375 1376

    rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1377
  } else {
dengyihao's avatar
dengyihao 已提交
1378
    // move to next IP
1379 1380
    pContext->epSet.inUse++;
    pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1381
    rpcSendReqToServer(pRpc, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1382
  }
H
hzcheng 已提交
1383 1384
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1385 1386 1387
static void rpcProcessRetryTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;
H
hzcheng 已提交
1388

1389
  rpcLockConn(pConn);
H
hzcheng 已提交
1390

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1391
  if (pConn->outType && pConn->user[0]) {
H
Hongze Cheng 已提交
1392
    tDebug("%s, expected %s is not received", pConn->info, TMSG_INFO((int)pConn->outType + 1));
H
hzcheng 已提交
1393 1394 1395
    pConn->pTimer = NULL;
    pConn->retry++;

S
slguan 已提交
1396
    if (pConn->retry < 4) {
H
Hongze Cheng 已提交
1397
      tDebug("%s, re-send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn, pConn->peerPort);
dengyihao's avatar
dengyihao 已提交
1398
      rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
1399
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
H
hzcheng 已提交
1400 1401
    } else {
      // close the connection
dengyihao's avatar
dengyihao 已提交
1402 1403
      tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn,
             pConn->peerPort);
1404 1405
      if (pConn->pContext) {
        pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1406
        pConn->pContext->pConn = NULL;
1407
        pConn->pReqMsg = NULL;
1408
        taosTmrStart(rpcProcessConnError, 1, pConn->pContext, pRpc->tmrCtrl);
1409 1410
        rpcReleaseConn(pConn);
      }
H
hzcheng 已提交
1411
    }
1412
  } else {
1413
    tDebug("%s, retry timer not processed", pConn->info);
H
hzcheng 已提交
1414 1415
  }

1416
  rpcUnlockConn(pConn);
H
hzcheng 已提交
1417 1418
}

1419 1420 1421
static void rpcProcessIdleTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;

1422 1423
  rpcLockConn(pConn);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1424
  if (pConn->user[0]) {
1425
    tDebug("%s, close the connection since no activity", pConn->info);
dengyihao's avatar
dengyihao 已提交
1426
    if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
1427
    rpcReleaseConn(pConn);
1428
  } else {
1429
    tDebug("%s, idle timer:%p not processed", pConn->info, tmrId);
1430
  }
1431 1432

  rpcUnlockConn(pConn);
1433 1434 1435 1436 1437 1438
}

static void rpcProcessProgressTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;

1439
  rpcLockConn(pConn);
1440

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1441
  if (pConn->inType && pConn->user[0]) {
1442
    tDebug("%s, progress timer expired, send progress", pConn->info);
1443
    rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
1444
    pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
1445
  } else {
1446
    tDebug("%s, progress timer:%p not processed", pConn->info, tmrId);
1447 1448
  }

1449
  rpcUnlockConn(pConn);
1450 1451
}

dengyihao's avatar
dengyihao 已提交
1452 1453 1454 1455 1456
static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen) {
  SRpcHead *pHead = rpcHeadFromCont(pCont);
  int32_t   finalLen = 0;
  int       overhead = sizeof(SRpcComp);

1457 1458 1459
  if (!NEEDTO_COMPRESSS_MSG(contLen)) {
    return contLen;
  }
dengyihao's avatar
dengyihao 已提交
1460 1461

  char *buf = malloc(contLen + overhead + 8);  // 8 extra bytes
1462
  if (buf == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1463
    tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
1464 1465
    return contLen;
  }
dengyihao's avatar
dengyihao 已提交
1466

1467
  int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
S
Shuduo Sang 已提交
1468
  tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead);
dengyihao's avatar
dengyihao 已提交
1469

1470 1471 1472 1473
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
S
TD-4100  
Shengliang Guan 已提交
1474
  if (compLen > 0 && compLen < contLen - overhead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1475
    SRpcComp *pComp = (SRpcComp *)pCont;
dengyihao's avatar
dengyihao 已提交
1476 1477
    pComp->reserved = 0;
    pComp->contLen = htonl(contLen);
1478
    memcpy(pCont + overhead, buf, compLen);
dengyihao's avatar
dengyihao 已提交
1479

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1480
    pHead->comp = 1;
S
Shuduo Sang 已提交
1481
    tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen);
1482 1483 1484 1485 1486 1487 1488 1489 1490
    finalLen = compLen + overhead;
  } else {
    finalLen = contLen;
  }

  free(buf);
  return finalLen;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1491
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
dengyihao's avatar
dengyihao 已提交
1492 1493 1494 1495
  int       overhead = sizeof(SRpcComp);
  SRpcHead *pNewHead = NULL;
  uint8_t * pCont = pHead->content;
  SRpcComp *pComp = (SRpcComp *)pHead->content;
1496

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1497
  if (pHead->comp) {
1498
    // decompress the content
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1499 1500
    assert(pComp->reserved == 0);
    int contLen = htonl(pComp->contLen);
dengyihao's avatar
dengyihao 已提交
1501

1502
    // prepare the temporary buffer to decompress message
1503
    char *temp = (char *)malloc(contLen + RPC_MSG_OVERHEAD);
dengyihao's avatar
dengyihao 已提交
1504 1505
    pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext));  // reserve SRpcReqContext

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1506
    if (pNewHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1507
      int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
dengyihao's avatar
dengyihao 已提交
1508
      int origLen = LZ4_decompress_safe((char *)(pCont + overhead), (char *)pNewHead->content, compLen, contLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1509
      assert(origLen == contLen);
dengyihao's avatar
dengyihao 已提交
1510

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1511
      memcpy(pNewHead, pHead, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1512
      pNewHead->msgLen = rpcMsgLenFromCont(origLen);
dengyihao's avatar
dengyihao 已提交
1513 1514
      rpcFreeMsg(pHead);  // free the compressed message buffer
      pHead = pNewHead;
S
TD-1762  
Shengliang Guan 已提交
1515
      tTrace("decomp malloc mem:%p", temp);
1516
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1517
      tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
1518 1519 1520
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1521
  return pHead;
1522 1523
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1524
static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) {
dengyihao's avatar
dengyihao 已提交
1525
  T_MD5_CTX context;
dengyihao's avatar
dengyihao 已提交
1526
  int       ret = -1;
H
hzcheng 已提交
1527

dengyihao's avatar
dengyihao 已提交
1528
  tMD5Init(&context);
H
Haojun Liao 已提交
1529
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1530
  tMD5Update(&context, (uint8_t *)pMsg, msgLen);
H
Haojun Liao 已提交
1531
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1532
  tMD5Final(&context);
H
hzcheng 已提交
1533 1534 1535 1536 1537 1538

  if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;

  return ret;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1539
static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) {
dengyihao's avatar
dengyihao 已提交
1540
  T_MD5_CTX context;
H
hzcheng 已提交
1541

dengyihao's avatar
dengyihao 已提交
1542
  tMD5Init(&context);
H
Haojun Liao 已提交
1543
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1544
  tMD5Update(&context, (uint8_t *)pMsg, msgLen);
H
Haojun Liao 已提交
1545
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1546
  tMD5Final(&context);
H
hzcheng 已提交
1547 1548 1549

  memcpy(pAuth, context.digest, sizeof(context.digest));
}
1550 1551

static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1552
  SRpcHead *pHead = (SRpcHead *)msg;
1553

1554
  if (pConn->spi && pConn->secured == 0) {
1555
    // add auth part
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1556
    pHead->spi = pConn->spi;
1557 1558 1559
    SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1560
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1561
    rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
1562
  } else {
1563
    pHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1564
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1565 1566 1567 1568 1569 1570
  }

  return msgLen;
}

static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1571
  SRpcHead *pHead = (SRpcHead *)msg;
1572
  int       code = 0;
1573

dengyihao's avatar
dengyihao 已提交
1574 1575
  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
    // secured link, or no authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1576
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1577
    // tTrace("%s, secured link, no auth is required", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1578 1579 1580
    return 0;
  }

dengyihao's avatar
dengyihao 已提交
1581
  if (!rpcIsReq(pHead->msgType)) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1582 1583
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
1584
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
dengyihao's avatar
dengyihao 已提交
1585 1586
        code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
        code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
1587
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1588
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1589
      return 0;
1590
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1591
  }
dengyihao's avatar
dengyihao 已提交
1592

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1593
  code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1594
  if (pHead->spi == pConn->spi) {
1595
    // authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1596
    SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest));
1597 1598 1599 1600 1601

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1602
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
1603
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
1604
    } else {
dengyihao's avatar
dengyihao 已提交
1605
      if (rpcAuthenticateMsg(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
1606
        tDebug("%s, authentication failed, msg discarded", pConn->info);
1607
        code = TSDB_CODE_RPC_AUTH_FAILURE;
1608
      } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1609
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
dengyihao's avatar
dengyihao 已提交
1610
        if (!rpcIsReq(pHead->msgType)) pConn->secured = 1;  // link is secured for client
1611
        // tTrace("%s, message is authenticated", pConn->info);
1612 1613 1614
      }
    }
  } else {
1615
    tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
1616
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
1617 1618 1619 1620 1621
  }

  return code;
}

1622
static void rpcLockConn(SRpcConn *pConn) {
S
TD-2616  
Shengliang Guan 已提交
1623
  int64_t tid = taosGetSelfPthreadId();
1624
  int     i = 0;
1625 1626 1627 1628 1629 1630 1631 1632
  while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) {
    if (++i % 1000 == 0) {
      sched_yield();
    }
  }
}

static void rpcUnlockConn(SRpcConn *pConn) {
S
TD-2616  
Shengliang Guan 已提交
1633
  int64_t tid = taosGetSelfPthreadId();
1634 1635 1636 1637
  if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) {
    assert(false);
  }
}
1638

dengyihao's avatar
dengyihao 已提交
1639
static void rpcAddRef(SRpcInfo *pRpc) { atomic_add_fetch_32(&pRpc->refCount, 1); }
1640

dengyihao's avatar
dengyihao 已提交
1641
static void rpcDecRef(SRpcInfo *pRpc) {
1642
  if (atomic_sub_fetch_32(&pRpc->refCount, 1) == 0) {
1643
    rpcCloseConnCache(pRpc->pCache);
dengyihao's avatar
dengyihao 已提交
1644
    taosHashCleanup(pRpc->hash);
dengyihao's avatar
dengyihao 已提交
1645
    taosTmrCleanUp(pRpc->tmrCtrl);
dengyihao's avatar
dengyihao 已提交
1646
    taosIdPoolCleanUp(pRpc->idPool);
1647

S
TD-1848  
Shengliang Guan 已提交
1648
    tfree(pRpc->connList);
1649
    pthread_mutex_destroy(&pRpc->mutex);
1650
    tDebug("%s rpc resources are released", pRpc->label);
S
TD-1848  
Shengliang Guan 已提交
1651
    tfree(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1652

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1653
    atomic_sub_fetch_32(&tsRpcNum, 1);
1654 1655
  }
}
dengyihao's avatar
dengyihao 已提交
1656
#endif