rpcMain.c 53.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20 21
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
#include "ttimer.h"
#include "tutil.h"
H
hjxilinx 已提交
22
#include "lz4.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23
#include "tref.h"
S
slguan 已提交
24
#include "taoserror.h"
S
slguan 已提交
25
#include "tglobal.h"
H
Hongze Cheng 已提交
26
#include "tmsg.h"
S
slguan 已提交
27
#include "trpc.h"
H
Haojun Liao 已提交
28
#include "thash.h"
S
slguan 已提交
29
#include "rpcLog.h"
30 31
#include "rpcUdp.h"
#include "rpcCache.h"
J
Jeff Tao 已提交
32
#include "rpcTcp.h"
33
#include "rpcHead.h"
H
hzcheng 已提交
34

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
35
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) 
36
#define rpcHeadFromCont(cont) ((SRpcHead *) ((char*)cont - sizeof(SRpcHead)))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
37 38 39
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
40
#define rpcIsReq(type) (type & 1U)
H
hzcheng 已提交
41 42

typedef struct {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
43 44 45
  int      sessions;     // number of sessions allowed
  int      numOfThreads; // number of threads to process incoming messages
  int      idleTime;     // milliseconds;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46
  uint16_t localPort;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47 48
  int8_t   connType;
  int      index;        // for UDP server only, round robin for multiple threads
49
  char     label[TSDB_LABEL_LEN];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50

51 52 53
  char     user[TSDB_UNI_LEN];   // meter ID
  char     spi;                  // security parameter index
  char     encrypt;              // encrypt algorithm
54 55
  char     secret[TSDB_PASSWORD_LEN]; // secret for the link
  char     ckey[TSDB_PASSWORD_LEN];   // ciphering key
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
56

S
Shengliang Guan 已提交
57 58
  void   (*cfp)(void *parent, SRpcMsg *, SEpSet *);
  int    (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
59

60
  int32_t   refCount;
S
Shengliang Guan 已提交
61
  void     *parent;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
62 63
  void     *idPool;   // handle to ID pool
  void     *tmrCtrl;  // handle to timer
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
64
  SHashObj *hash;     // handle returned by hash utility
65 66
  void     *tcphandle;// returned handle from TCP initialization
  void     *udphandle;// returned handle from UDP initialization
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67
  void     *pCache;   // connection cache
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68
  pthread_mutex_t  mutex;
J
Jeff Tao 已提交
69
  struct SRpcConn *connList;  // connection list
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
70 71 72
} SRpcInfo;

typedef struct {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
73
  SRpcInfo *pRpc;       // associated SRpcInfo
S
Shengliang Guan 已提交
74
  SEpSet   epSet;      // ip list provided by app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75
  void     *ahandle;    // handle provided by app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76
  struct SRpcConn *pConn; // pConn allocated
77
  tmsg_t    msgType;    // message type
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
78 79
  uint8_t  *pCont;      // content provided by app
  int32_t   contLen;    // content length
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
80
  int32_t   code;       // error code
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
81
  int16_t   numOfTry;   // number of try for different servers
82
  int8_t    oldInUse;   // server EP inUse passed by app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
83
  int8_t    redirect;   // flag to indicate redirect
84
  int8_t    connType;   // connection type
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
85
  int64_t   rid;        // refId returned by taosAddRef
86 87
  SRpcMsg  *pRsp;       // for synchronous API
  tsem_t   *pSem;       // for synchronous API
S
Shengliang Guan 已提交
88
  SEpSet   *pSet;      // for synchronous API 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
89
  char      msg[0];     // RpcHead starts from here
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
90 91
} SRpcReqContext;

J
Jeff Tao 已提交
92
typedef struct SRpcConn {
93
  char      info[48];// debug info: label + pConn + ahandle
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
94 95 96
  int       sid;     // session ID
  uint32_t  ownId;   // own link ID
  uint32_t  peerId;  // peer link ID
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97
  char      user[TSDB_UNI_LEN]; // user ID for the link
98 99
  char      spi;     // security parameter index
  char      encrypt; // encryption, 0:1 
100 101
  char      secret[TSDB_PASSWORD_LEN]; // secret for the link
  char      ckey[TSDB_PASSWORD_LEN];   // ciphering key
102
  char      secured;              // if set to 1, no authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
103
  uint16_t  localPort;      // for UDP only
104
  uint32_t  linkUid;        // connection unique ID assigned by client
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105 106
  uint32_t  peerIp;         // peer IP
  uint16_t  peerPort;       // peer port
J
jtao1735 已提交
107
  char      peerFqdn[TSDB_FQDN_LEN]; // peer FQDN or ip string
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108 109
  uint16_t  tranId;         // outgoing transcation ID, for build message
  uint16_t  outTranId;      // outgoing transcation ID
110
  uint16_t  inTranId;       // transcation ID for incoming msg
111 112
  tmsg_t    outType;        // message type for outgoing request
  tmsg_t    inType;         // message type for incoming request  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113 114
  void     *chandle;  // handle passed by TCP/UDP connection layer
  void     *ahandle;  // handle provided by upper app layter
115
  int       retry;    // number of retry for sending request
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
116
  int       tretry;   // total retry
117 118 119 120 121 122 123
  void     *pTimer;   // retry timer to monitor the response
  void     *pIdleTimer; // idle timer
  char     *pRspMsg;    // response message including header
  int       rspMsgLen;  // response messag length
  char     *pReqMsg;    // request message including header
  int       reqMsgLen;  // request message length
  SRpcInfo *pRpc;       // the associated SRpcInfo
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
124
  int8_t    connType;   // connection type
125
  int64_t   lockedBy;   // lock for connection
126
  SRpcReqContext *pContext; // request context
H
hzcheng 已提交
127 128
} SRpcConn;

129 130
static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT;

S
slguan 已提交
131
int tsRpcMaxUdpSize = 15000;  // bytes
132
int tsProgressTimer = 100;
H
hzcheng 已提交
133 134 135
// not configurable
int tsRpcMaxRetry;
int tsRpcHeadSize;
136
int tsRpcOverhead;
H
hzcheng 已提交
137

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138 139
static int     tsRpcRefId = -1;
static int32_t tsRpcNum = 0;
140
//static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
141

142 143 144 145 146 147
// server:0 client:1  tcp:2 udp:0
#define RPC_CONN_UDPS   0
#define RPC_CONN_UDPC   1
#define RPC_CONN_TCPS   2
#define RPC_CONN_TCPC   3

J
jtao1735 已提交
148
void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
149 150
    taosInitUdpConnection,
    taosInitUdpConnection,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151 152 153
    taosInitTcpServer, 
    taosInitTcpClient
};
H
hzcheng 已提交
154

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155
void (*taosCleanUpConn[])(void *thandle) = {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156 157
    taosCleanUpUdpConnection, 
    taosCleanUpUdpConnection, 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
158 159 160
    taosCleanUpTcpServer,
    taosCleanUpTcpClient
};
H
hzcheng 已提交
161

162
void (*taosStopConn[])(void *thandle) = {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
163 164
    taosStopUdpConnection, 
    taosStopUdpConnection, 
165
    taosStopTcpServer,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
166
    taosStopTcpClient,
167 168
};

169
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
170 171
    taosSendUdpData, 
    taosSendUdpData, 
J
Jeff Tao 已提交
172 173
    taosSendTcpData, 
    taosSendTcpData
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
174
};
H
hzcheng 已提交
175

J
jtao1735 已提交
176
void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = {
S
slguan 已提交
177 178 179 180
    taosOpenUdpConnection,
    taosOpenUdpConnection,
    NULL,
    taosOpenTcpClientConnection,
H
hzcheng 已提交
181 182
};

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
183 184 185
void (*taosCloseConn[])(void *chandle) = {
    NULL, 
    NULL, 
J
Jeff Tao 已提交
186 187
    taosCloseTcpConnection, 
    taosCloseTcpConnection
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
188 189
};

J
jtao1735 已提交
190
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
191
static void      rpcCloseConn(void *thandle);
192
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext);
193
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
194 195
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
H
hzcheng 已提交
196

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
197
static void  rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
198
static void  rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
199
static void  rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
200
static void  rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201
static void  rpcSendReqHead(SRpcConn *pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
202

203
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
S
slguan 已提交
204
static void  rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext);
205 206 207
static void  rpcProcessConnError(void *param, void *id);
static void  rpcProcessRetryTimer(void *, void *);
static void  rpcProcessIdleTimer(void *param, void *tmrId);
208
static void  rpcProcessProgressTimer(void *param, void *tmrId);
H
hzcheng 已提交
209

210
static void  rpcFreeMsg(void *msg);
211
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
212
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
213 214
static int   rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
static int   rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
215 216
static void  rpcLockConn(SRpcConn *pConn);
static void  rpcUnlockConn(SRpcConn *pConn);
217 218
static void  rpcAddRef(SRpcInfo *pRpc);
static void  rpcDecRef(SRpcInfo *pRpc);
H
hzcheng 已提交
219

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
220 221 222 223 224
static void rpcFree(void *p) {
  tTrace("free mem: %p", p);
  free(p);
}

225 226 227 228
static void rpcInitImp(void) {
  tsProgressTimer = tsRpcTimer / 2;
  tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsProgressTimer;
  tsRpcHeadSize = RPC_MSG_OVERHEAD;
229
  tsRpcOverhead = sizeof(SRpcReqContext);
H
hzcheng 已提交
230

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231
  tsRpcRefId = taosOpenRef(200, rpcFree);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
232
}
233

234 235
int32_t rpcInit(void) {
  pthread_once(&tsRpcInitOnce, rpcInitImp);
236
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237
}
238

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239 240 241 242 243
void rpcCleanup(void) {
  taosCloseRef(tsRpcRefId);
  tsRpcRefId = -1;
}
 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244 245 246
void *rpcOpen(const SRpcInit *pInit) {
  SRpcInfo *pRpc;

247
  //pthread_once(&tsRpcInit, rpcInit);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
248

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
249 250
  pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
  if (pRpc == NULL) return NULL;
H
hzcheng 已提交
251

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252
  if(pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
253
  pRpc->connType = pInit->connType;
Y
TD-3115  
yihaoDeng 已提交
254 255 256 257 258
  if (pRpc->connType == TAOS_CONN_CLIENT) {
    pRpc->numOfThreads = pInit->numOfThreads;
  } else {
    pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
259 260 261
  pRpc->idleTime = pInit->idleTime;
  pRpc->localPort = pInit->localPort;
  pRpc->afp = pInit->afp;
J
jtao1735 已提交
262
  pRpc->sessions = pInit->sessions+1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
263 264 265
  if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
  if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret));
  if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
266
  pRpc->spi = pInit->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
267
  pRpc->cfp = pInit->cfp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
268
  pRpc->afp = pInit->afp;
S
Shengliang Guan 已提交
269
  pRpc->parent = pInit->parent;
270
  pRpc->refCount = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
272 273
  atomic_add_fetch_32(&tsRpcNum, 1);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
274
  size_t size = sizeof(SRpcConn) * pRpc->sessions;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
275 276
  pRpc->connList = (SRpcConn *)calloc(1, size);
  if (pRpc->connList == NULL) {
S
TD-1530  
Shengliang Guan 已提交
277
    tError("%s failed to allocate memory for taos connections, size:%" PRId64, pRpc->label, (int64_t)size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
278
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
279 280
    return NULL;
  }
H
hzcheng 已提交
281

J
jtao1735 已提交
282
  pRpc->idPool = taosInitIdPool(pRpc->sessions-1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283 284
  if (pRpc->idPool == NULL) {
    tError("%s failed to init ID pool", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
285
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
286
    return NULL;
H
hzcheng 已提交
287
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
288

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289
  pRpc->tmrCtrl = taosTmrInit(pRpc->sessions*2 + 1, 50, 10000, pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
290 291
  if (pRpc->tmrCtrl == NULL) {
    tError("%s failed to init timers", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
292
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
293 294
    return NULL;
  }
H
hzcheng 已提交
295

296
  if (pRpc->connType == TAOS_CONN_SERVER) {
H
Haojun Liao 已提交
297
    pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true);
298 299 300 301 302 303
    if (pRpc->hash == NULL) {
      tError("%s failed to init string hash", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
  } else {
dengyihao's avatar
dengyihao 已提交
304
    pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20); 
305 306 307 308 309
    if ( pRpc->pCache == NULL ) {
      tError("%s failed to init connection cache", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
310
  }
H
hzcheng 已提交
311

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
312 313
  pthread_mutex_init(&pRpc->mutex, NULL);

314 315 316 317 318 319 320 321 322 323 324
  pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, 
                                                  pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
  pRpc->udphandle = (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, 
                                                  pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);

  if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) {
    tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort);
    rpcClose(pRpc);
    return NULL;
  }

S
TD-1843  
Shengliang Guan 已提交
325
  tDebug("%s rpc is opened, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions);
H
hzcheng 已提交
326

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
327
  return pRpc;
H
hzcheng 已提交
328 329
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
330 331
void rpcClose(void *param) {
  SRpcInfo *pRpc = (SRpcInfo *)param;
H
hzcheng 已提交
332

333
  // stop connection to outside first
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
334 335
  (*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
  (*taosStopConn[pRpc->connType])(pRpc->udphandle);
336 337

  // close all connections 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
338
  for (int i = 0; i < pRpc->sessions; ++i) {
339
    if (pRpc->connList && pRpc->connList[i].user[0]) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
340
      rpcCloseConn((void *)(pRpc->connList + i));
H
hzcheng 已提交
341 342 343
    }
  }

344
  // clean up
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
345 346
  (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
  (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
J
Jeff Tao 已提交
347

348
  tDebug("%s rpc is closed", pRpc->label);
349
  rpcDecRef(pRpc);
H
hzcheng 已提交
350 351
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
352 353
void *rpcMallocCont(int contLen) {
  int size = contLen + RPC_MSG_OVERHEAD;
354

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
355 356
  char *start = (char *)calloc(1, (size_t)size);
  if (start == NULL) {
357 358
    tError("failed to malloc msg, size:%d", size);
    return NULL;
359
  } else {
S
TD-1762  
Shengliang Guan 已提交
360
    tTrace("malloc mem:%p size:%d", start, size);
361 362
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
363
  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
364 365 366
}

void rpcFreeCont(void *cont) {
S
Shengliang Guan 已提交
367
  if (cont) {
368 369
    char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext);
    free(temp);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
370
    tTrace("free mem: %p", temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
371
  }
372 373
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
374 375 376 377 378 379
void *rpcReallocCont(void *ptr, int contLen) {
  if (ptr == NULL) return rpcMallocCont(contLen);

  char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead);
  if (contLen == 0 ) {
    free(start); 
J
Jeff Tao 已提交
380
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
381 382 383 384 385 386 387 388 389 390 391 392
  }

  int size = contLen + RPC_MSG_OVERHEAD;
  start = realloc(start, size);
  if (start == NULL) {
    tError("failed to realloc cont, size:%d", size);
    return NULL;
  } 

  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}

S
Shengliang Guan 已提交
393
void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
394 395 396
  SRpcInfo       *pRpc = (SRpcInfo *)shandle;
  SRpcReqContext *pContext;

J
Jeff Tao 已提交
397
  int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
398
  pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
399
  pContext->ahandle = pMsg->ahandle;
400
  pContext->pRpc = (SRpcInfo *)shandle;
401
  pContext->epSet = *pEpSet;
J
Jeff Tao 已提交
402
  pContext->contLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
403 404
  pContext->pCont = pMsg->pCont;
  pContext->msgType = pMsg->msgType;
405
  pContext->oldInUse = pEpSet->inUse;
406

407
  pContext->connType = RPC_CONN_UDPC; 
张宏权 已提交
408
  if (contLen > tsRpcMaxUdpSize || tsRpcForceTcp ) pContext->connType = RPC_CONN_TCPC;
409 410 411

  // connection type is application specific. 
  // for TDengine, all the query, show commands shall have TCP connection
412
  tmsg_t type = pMsg->msgType;
H
Hongze Cheng 已提交
413 414 415 416
  if (type == TDMT_VND_QUERY || type == TDMT_MND_SHOW_RETRIEVE
    || type == TDMT_VND_FETCH || type == TDMT_MND_VGROUP_LIST
    || type == TDMT_VND_TABLES_META || type == TDMT_VND_TABLE_META
    || type == TDMT_MND_SHOW || type == TDMT_MND_STATUS || type == TDMT_VND_ALTER_TABLE)
417
    pContext->connType = RPC_CONN_TCPC;
张宏权 已提交
418

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
419
  pContext->rid = taosAddRef(tsRpcRefId, pContext);
420
  if (pRid) *pRid = pContext->rid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
421

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
422
  rpcSendReqToServer(pRpc, pContext);
423 424
}

J
Jeff Tao 已提交
425
void rpcSendResponse(const SRpcMsg *pRsp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
426
  int        msgLen = 0;
J
Jeff Tao 已提交
427 428 429
  SRpcConn  *pConn = (SRpcConn *)pRsp->handle;
  SRpcMsg    rpcMsg = *pRsp;
  SRpcMsg   *pMsg = &rpcMsg;
430
  SRpcInfo  *pRpc = pConn->pRpc;
J
Jeff Tao 已提交
431

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
432 433 434
  if ( pMsg->pCont == NULL ) {
    pMsg->pCont = rpcMallocCont(0);
    pMsg->contLen = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
435 436
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
437
  SRpcHead  *pHead = rpcHeadFromCont(pMsg->pCont);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
438 439
  char      *msg = (char *)pHead;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
440 441
  pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
  msgLen = rpcMsgLenFromCont(pMsg->contLen);
442

443
  rpcLockConn(pConn);
444

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
445
  if ( pConn->inType == 0 || pConn->user[0] == 0 ) {
H
Haojun Liao 已提交
446
    tError("%s, connection is already released, rsp wont be sent", pConn->info);
447
    rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
448 449
    rpcFreeCont(pMsg->pCont);
    rpcDecRef(pRpc);
450 451 452
    return;
  }

453
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
454 455
  pHead->version = 1;
  pHead->msgType = pConn->inType+1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
456 457
  pHead->spi = pConn->spi;
  pHead->encrypt = pConn->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
458 459 460
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
461
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
462
  pHead->port = htons(pConn->localPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
463
  pHead->code = htonl(pMsg->code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
464 465
  pHead->ahandle = (uint64_t) pConn->ahandle;
 
466 467
  // set pConn parameters
  pConn->inType = 0;
468 469

  // response message is released until new response is sent
470
  rpcFreeMsg(pConn->pRspMsg); 
471 472
  pConn->pRspMsg = msg;
  pConn->rspMsgLen = msgLen;
473
  if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--;
474

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
475
  // stop the progress timer
476
  taosTmrStopA(&pConn->pTimer);
477 478

  // set the idle timer to monitor the activity
Y
yihaoDeng 已提交
479
  taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime * 30, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
480
  rpcSendMsgToPeer(pConn, msg, msgLen);
481 482 483 484

  // if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured
  if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY)
    pConn->secured = 1; // connection shall be secured
485 486

  if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
487 488
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
489

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
490
  rpcUnlockConn(pConn);
491
  rpcDecRef(pRpc);    // decrease the referene count
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
492

493 494 495
  return;
}

S
Shengliang Guan 已提交
496
void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
497 498
  SRpcMsg  rpcMsg; 
  memset(&rpcMsg, 0, sizeof(rpcMsg));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
499
  
S
Shengliang Guan 已提交
500
  rpcMsg.contLen = sizeof(SEpSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
501 502
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  if (rpcMsg.pCont == NULL) return;
503

S
Shengliang Guan 已提交
504
  memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet));
505

506
  rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
507
  rpcMsg.handle = thandle;
508

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
509
  rpcSendResponse(&rpcMsg);
510 511 512 513

  return;
}

514
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
515
  SRpcConn  *pConn = (SRpcConn *)thandle;
516
  if (pConn->user[0] == 0) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
517

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
518 519
  pInfo->clientIp = pConn->peerIp;
  pInfo->clientPort = pConn->peerPort;
J
jtao1735 已提交
520
  // pInfo->serverIp = pConn->destIp;
S
Shengliang Guan 已提交
521
  
B
Bomin Zhang 已提交
522
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
523
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
524 525
}

S
Shengliang Guan 已提交
526
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
527
  SRpcReqContext *pContext;
528
  pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
529 530 531 532 533 534 535

  memset(pRsp, 0, sizeof(SRpcMsg));
  
  tsem_t   sem;        
  tsem_init(&sem, 0, 0);
  pContext->pSem = &sem;
  pContext->pRsp = pRsp;
536
  pContext->pSet = pEpSet;
537

538
  rpcSendRequest(shandle, pEpSet, pMsg, NULL);
539 540 541 542 543 544 545

  tsem_wait(&sem);
  tsem_destroy(&sem);

  return;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
546
// this API is used by server app to keep an APP context in case connection is broken
547
int rpcReportProgress(void *handle, char *pCont, int contLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
548
  SRpcConn *pConn = (SRpcConn *)handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
549
  int code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
550

551
  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
552

553 554 555 556
  if (pConn->user[0]) {
    // pReqMsg and reqMsgLen is re-used to store the context from app server
    pConn->pReqMsg = pCont;     
    pConn->reqMsgLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
557
  } else {
558
    tDebug("%s, rpc connection is already released", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
559 560 561
    rpcFreeCont(pCont);
    code = -1;
  }
562

563
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
564
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
565 566
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
567
void rpcCancelRequest(int64_t rid) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
568

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
569 570
  SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid);
  if (pContext == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
571

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
572
  rpcCloseConn(pContext->pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
573

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
574
  taosReleaseRef(tsRpcRefId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
575 576
}

577 578 579 580
static void rpcFreeMsg(void *msg) {
  if ( msg ) {
    char *temp = (char *)msg - sizeof(SRpcReqContext);
    free(temp);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
581
    tTrace("free mem: %p", temp);
582 583 584
  }
}

J
jtao1735 已提交
585
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
586
  SRpcConn *pConn;
S
slguan 已提交
587

588
  uint32_t peerIp = taosGetIpv4FromFqdn(peerFqdn);
589
  if (peerIp == 0xFFFFFFFF) {
J
jtao1735 已提交
590
    tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); 
591
    terrno = TSDB_CODE_RPC_FQDN_ERROR; 
J
jtao1735 已提交
592 593 594
    return NULL;
  }

595
  pConn = rpcAllocateClientConn(pRpc); 
H
hzcheng 已提交
596

597
  if (pConn) { 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
598
    tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn));
J
jtao1735 已提交
599
    pConn->peerIp = peerIp;
600
    pConn->peerPort = peerPort;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
601
    tstrncpy(pConn->user, pRpc->user, sizeof(pConn->user));
602
    pConn->connType = connType;
603

604 605
    if (taosOpenConn[connType]) {
      void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle;
J
jtao1735 已提交
606
      pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
607
      if (pConn->chandle == NULL) {
H
Haojun Liao 已提交
608
        tError("failed to connect to:%s:%d", taosIpStr(pConn->peerIp), pConn->peerPort);
609

610
        terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
611 612 613
        rpcCloseConn(pConn);
        pConn = NULL;
      }
H
hzcheng 已提交
614 615 616
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
617
  return pConn;
H
hzcheng 已提交
618 619
}

620
static void rpcReleaseConn(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
621
  SRpcInfo *pRpc = pConn->pRpc;
622
  if (pConn->user[0] == 0) return;
H
hzcheng 已提交
623

624 625
  pConn->user[0] = 0;
  if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle);
H
hzcheng 已提交
626

627 628 629 630 631
  taosTmrStopA(&pConn->pTimer);
  taosTmrStopA(&pConn->pIdleTimer);

  if ( pRpc->connType == TAOS_CONN_SERVER) {
    char hashstr[40] = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
632
    size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
J
jtao1735 已提交
633
    taosHashRemove(pRpc->hash, hashstr, size);
634
    rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
635
    pConn->pRspMsg = NULL;
636 637 638
  
    // if server has ever reported progress, free content
    if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);  // do not use rpcFreeMsg
639
  } else {
640
    // if there is an outgoing message, free it
陶建辉(Jeff)'s avatar
TD-1669  
陶建辉(Jeff) 已提交
641
    if (pConn->outType && pConn->pReqMsg) {
642
      SRpcReqContext *pContext = pConn->pContext;
Y
yihaoDeng 已提交
643 644
      if (pContext) {
        if (pContext->pRsp) {   
645
        // for synchronous API, post semaphore to unblock app
Y
yihaoDeng 已提交
646 647 648 649 650 651 652 653 654
          pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR;
          pContext->pRsp->pCont = NULL;
          pContext->pRsp->contLen = 0;
          tsem_post(pContext->pSem);
        }
        pContext->pConn = NULL; 
        taosRemoveRef(tsRpcRefId, pContext->rid);
      } else {
        assert(0); 
655
      }
陶建辉(Jeff)'s avatar
TD-1669  
陶建辉(Jeff) 已提交
656
    }
657
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
658

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
659 660 661 662 663 664
  // memset could not be used, since lockeBy can not be reset
  pConn->inType = 0;
  pConn->outType = 0;
  pConn->inTranId = 0;
  pConn->outTranId = 0;
  pConn->secured = 0;
665
  pConn->peerId = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
666 667
  pConn->peerIp = 0;
  pConn->peerPort = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
668 669 670
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
  pConn->pContext = NULL;
671
  pConn->chandle = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
672 673

  taosFreeId(pRpc->idPool, pConn->sid);
674
  tDebug("%s, rpc connection is released", pConn->info);
675 676 677 678
}

static void rpcCloseConn(void *thandle) {
  SRpcConn *pConn = (SRpcConn *)thandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
679
  if (pConn == NULL) return;
680 681 682

  rpcLockConn(pConn);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
683 684
  if (pConn->user[0])
    rpcReleaseConn(pConn);
685

686
  rpcUnlockConn(pConn);
H
hzcheng 已提交
687 688
}

689 690
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
691

692 693 694
  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
695
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
696
  } else {
697 698 699 700
    pConn = pRpc->connList + sid;

    pConn->pRpc = pRpc;
    pConn->sid = sid;
701
    pConn->tranId = (uint16_t)(taosRand() & 0xFFFF);
702
    pConn->ownId = htonl(pConn->sid);
S
Shengliang Guan 已提交
703
    pConn->linkUid = (uint32_t)((int64_t)pConn + taosGetPid() + (int64_t)pConn->tranId);
704 705
    pConn->spi = pRpc->spi;
    pConn->encrypt = pRpc->encrypt;
706
    if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_PASSWORD_LEN);
707
    tDebug("%s %p client connection is allocated, uid:0x%x", pRpc->label, pConn, pConn->linkUid);
708
  }
H
hzcheng 已提交
709

710 711
  return pConn;
}
H
hzcheng 已提交
712

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
713
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
714
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
715
  char      hashstr[40] = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
716 717
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
718
  size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
719
 
720
  // check if it is already allocated
J
jtao1735 已提交
721
  SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
722
  if (ppConn) pConn = *ppConn;
723 724 725 726
  if (pConn) {
    pConn->secured = 0;
    return pConn;
  }
727

728 729 730 731 732 733
  // if code is not 0, it means it is simple reqhead, just ignore
  if (pHead->code != 0) {
    terrno = TSDB_CODE_RPC_ALREADY_PROCESSED;
    return NULL;
  }

734 735 736
  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
737
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
738 739
  } else {
    pConn = pRpc->connList + sid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
740
    memcpy(pConn->user, pHead->user, tListLen(pConn->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
741
    pConn->pRpc = pRpc;
H
hzcheng 已提交
742 743
    pConn->sid = sid;
    pConn->tranId = (uint16_t)(rand() & 0xFFFF);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
744
    pConn->ownId = htonl(pConn->sid);
745
    pConn->linkUid = pHead->linkUid;
746
    if (pRpc->afp) {
747
      if (pConn->user[0] == 0) {
748
        terrno = TSDB_CODE_RPC_AUTH_REQUIRED;
749
      } else {
S
Shengliang Guan 已提交
750
        terrno = (*pRpc->afp)(pRpc->parent, pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
751 752
      }

753
      if (terrno != 0) {
S
Shengliang Guan 已提交
754
        taosFreeId(pRpc->idPool, sid);  // sid shall be released
755 756
        pConn = NULL;
      }
H
hzcheng 已提交
757
    }
S
Shengliang Guan 已提交
758
  }
H
hzcheng 已提交
759

760
  if (pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
761 762
    if (pRecv->connType == RPC_CONN_UDPS && pRpc->numOfThreads > 1) {
      // UDP server, assign to new connection
S
Shengliang Guan 已提交
763
      pRpc->index = (pRpc->index + 1) % pRpc->numOfThreads;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
764 765
      pConn->localPort = (pRpc->localPort + pRpc->index);
    }
S
Shengliang Guan 已提交
766

J
jtao1735 已提交
767
    taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
S
Shengliang Guan 已提交
768
    tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s", pRpc->label, pConn, pConn->linkUid, sid, hashstr);
769 770 771 772 773
  }

  return pConn;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
774
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
775
  SRpcConn *pConn = NULL;  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
776
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
777 778 779

  if (sid) {
    pConn = pRpc->connList + sid;
780
    if (pConn->user[0] == 0) pConn = NULL;
781 782
  } 

783 784 785 786
  if (pConn == NULL) { 
    if (pRpc->connType == TAOS_CONN_SERVER) {
      pConn = rpcAllocateServerConn(pRpc, pRecv);
    } else {
787
      terrno = TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
788 789
    }
  }
790

791
  if (pConn) {
792
    if (pConn->linkUid != pHead->linkUid) {
793
      terrno = TSDB_CODE_RPC_MISMATCHED_LINK_ID;
S
Shengliang Guan 已提交
794 795
      tDebug("%s %p %p, linkUid:0x%x is not matched with received:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
             pConn->linkUid, pHead->linkUid);
796
      pConn = NULL;
H
hzcheng 已提交
797 798 799
    }
  }

800
  return pConn;
H
hzcheng 已提交
801 802
}

803
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
S
Shengliang Guan 已提交
804 805 806
  SRpcConn *pConn;
  SRpcInfo *pRpc = pContext->pRpc;
  SEpSet   *pEpSet = &pContext->epSet;
H
hzcheng 已提交
807

808
  pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
809
  if ( pConn == NULL || pConn->user[0] == 0) {
810
    pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
811 812 813
  } 

  if (pConn) {
S
Shengliang Guan 已提交
814
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
815
    pConn->ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
816
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
817
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
818
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
819
    tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
820
  }
H
hzcheng 已提交
821

822
  return pConn;
H
hzcheng 已提交
823 824
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
825
static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
H
hzcheng 已提交
826

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
827
    if (pConn->peerId == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
828
      pConn->peerId = pHead->sourceId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
829
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
830
      if (pConn->peerId != pHead->sourceId) {
831
        tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
832
               pConn->peerId, pHead->sourceId);
833
        return TSDB_CODE_RPC_INVALID_VALUE;
H
hzcheng 已提交
834 835 836
      }
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
837 838
    if (pConn->inTranId == pHead->tranId) {
      if (pConn->inType == pHead->msgType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
839
        if (pHead->code == 0) {
H
Hongze Cheng 已提交
840
          tDebug("%s, %s is retransmitted", pConn->info, TMSG_INFO(pHead->msgType));
841
          rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
842 843 844
        } else {
          // do nothing, it is heart beat from client
        }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
845
      } else if (pConn->inType == 0) {
H
Hongze Cheng 已提交
846
        tDebug("%s, %s is already processed, tranId:%d", pConn->info, TMSG_INFO(pHead->msgType), pConn->inTranId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
847
        rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
848
      } else {
H
Hongze Cheng 已提交
849
        tDebug("%s, mismatched message %s and tranId", pConn->info, TMSG_INFO(pHead->msgType));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
850
      }
H
hzcheng 已提交
851

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
852
      // do not reply any message
853
      return TSDB_CODE_RPC_ALREADY_PROCESSED;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
854
    }
H
hzcheng 已提交
855

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
856
    if (pConn->inType != 0) {
857
      tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
858
              pConn->inTranId, pHead->tranId);
859
      return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
860
    }
H
hzcheng 已提交
861

陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
862 863 864 865 866
    if (rpcContLenFromMsg(pHead->msgLen) <= 0) {
      tDebug("%s, message body is empty, ignore", pConn->info);
      return TSDB_CODE_RPC_APP_ERROR;
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
867 868
    pConn->inTranId = pHead->tranId;
    pConn->inType = pHead->msgType;
H
hzcheng 已提交
869

陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
870 871 872 873
    // start the progress timer to monitor the response from server app
    if (pConn->connType != RPC_CONN_TCPS) 
      pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl);
 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
874
    return 0;
H
hzcheng 已提交
875 876
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
877
static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
878
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
879
  pConn->peerId = pHead->sourceId;
H
hzcheng 已提交
880

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
881
  if (pConn->outType == 0 || pConn->pContext == NULL) {
882
    return TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
883
  }
H
hzcheng 已提交
884

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
885
  if (pHead->tranId != pConn->outTranId) {
886
    return TSDB_CODE_RPC_INVALID_TRAN_ID;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
887
  }
H
hzcheng 已提交
888

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
889
  if (pHead->msgType != pConn->outType + 1) {
890
    return TSDB_CODE_RPC_INVALID_RESPONSE_TYPE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
891
  }
H
hzcheng 已提交
892

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
893 894 895
  taosTmrStopA(&pConn->pTimer);
  pConn->retry = 0;

896
  if (pHead->code == TSDB_CODE_RPC_AUTH_REQUIRED && pRpc->spi) {
897
    tDebug("%s, authentication shall be restarted", pConn->info);
898 899
    pConn->secured = 0;
    rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);      
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
900 901
    if (pConn->connType != RPC_CONN_TCPC)
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
902
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
903 904
  }

905 906 907 908 909 910 911 912 913 914
  if (pHead->code == TSDB_CODE_RPC_MISMATCHED_LINK_ID) {
    tDebug("%s, mismatched linkUid, link shall be restarted", pConn->info);
    pConn->secured = 0;
    ((SRpcHead *)pConn->pReqMsg)->destId = 0;
    rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);      
    if (pConn->connType != RPC_CONN_TCPC)
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
  }

915
  if (pHead->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
916
    if (pConn->tretry <= tsRpcMaxRetry) {
917
      tDebug("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
918 919
      pConn->tretry++;
      rpcSendReqHead(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
920 921
      if (pConn->connType != RPC_CONN_TCPC)
        pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
922
      return TSDB_CODE_RPC_ALREADY_PROCESSED;
H
hzcheng 已提交
923
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
924
      // peer still in processing, give up
925
      tDebug("%s, server processing takes too long time, give up", pConn->info);
926
      pHead->code = TSDB_CODE_RPC_TOO_SLOW;
H
hzcheng 已提交
927 928
    }
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
929

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
930 931
  pConn->outType = 0;
  pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
932
  pConn->reqMsgLen = 0;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
933 934 935
  SRpcReqContext *pContext = pConn->pContext;

  if (pHead->code == TSDB_CODE_RPC_REDIRECT) { 
S
Shengliang Guan 已提交
936
    if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SEpSet)) {
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
937 938 939 940 941 942 943 944 945 946
      // if EpSet is not included in the msg, treat it as NOT_READY
      pHead->code = TSDB_CODE_RPC_NOT_READY; 
    } else {
      pContext->redirect++;
      if (pContext->redirect > TSDB_MAX_REPLICA) {
        pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; 
        tWarn("%s, too many redirects, quit", pConn->info);
      }
    }
  } 
947 948

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
949 950
}

S
slguan 已提交
951
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) {
952 953
  int32_t    sid;
  SRpcConn  *pConn = NULL;
H
hzcheng 已提交
954

955
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
956

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
957
  sid = htonl(pHead->destId);
S
slguan 已提交
958
  *ppContext = NULL;
H
hzcheng 已提交
959

960
  if (TMSG_INDEX(pHead->msgType) >= TDMT_MAX || TMSG_INDEX(pHead->msgType) <= 0) {
961
    tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
962
    terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE; return NULL;
H
hzcheng 已提交
963 964
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
965
  if (sid < 0 || sid >= pRpc->sessions) {
966
    tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid,
H
Hongze Cheng 已提交
967
           pRpc->sessions, TMSG_INFO(pHead->msgType));
968
    terrno = TSDB_CODE_RPC_INVALID_SESSION_ID; return NULL;
H
hzcheng 已提交
969 970
  }

971
  if (rpcIsReq(pHead->msgType) && htonl(pHead->msgVer) != tsVersion >> 8) {
H
Hongze Cheng 已提交
972
    tDebug("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion, TMSG_INFO(pHead->msgType));
973 974 975
    terrno = TSDB_CODE_RPC_INVALID_VERSION; return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
976
  pConn = rpcGetConnObj(pRpc, sid, pRecv);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
977
  if (pConn == NULL) {
978
    tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno)); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
979
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
980
  } 
H
hzcheng 已提交
981

982
  rpcLockConn(pConn);
H
hzcheng 已提交
983

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
984 985
  if (rpcIsReq(pHead->msgType)) {
    pConn->ahandle = (void *)pHead->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
986
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
987 988 989
  }

  sid = pConn->sid;
990
  if (pConn->chandle == NULL) pConn->chandle = pRecv->chandle;
J
jtao1735 已提交
991
  pConn->peerIp = pRecv->ip; 
992
  pConn->peerPort = pRecv->port;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
993
  if (pHead->port) pConn->peerPort = htons(pHead->port); 
H
hzcheng 已提交
994

995
  terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
996 997 998 999

  // code can be transformed only after authentication
  pHead->code = htonl(pHead->code);

1000
  if (terrno == 0) {
J
jtao1735 已提交
1001
    if (pHead->encrypt) {
1002 1003
      // decrypt here
    }
H
hzcheng 已提交
1004

1005 1006
    if ( rpcIsReq(pHead->msgType) ) {
      pConn->connType = pRecv->connType;
Y
yihaoDeng 已提交
1007
      terrno = rpcProcessReqHead(pConn, pHead);
1008

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1009 1010 1011 1012 1013 1014
      // stop idle timer
      taosTmrStopA(&pConn->pIdleTimer);  

      // client shall send the request within tsRpcTime again for UDP, double it 
      if (pConn->connType != RPC_CONN_TCPS)
        pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl);
1015 1016
    } else {
      terrno = rpcProcessRspHead(pConn, pHead);
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1017
      *ppContext = pConn->pContext;
1018
    }
H
hzcheng 已提交
1019 1020
  }

1021
  rpcUnlockConn(pConn);
H
hzcheng 已提交
1022

1023
  return pConn;
H
hzcheng 已提交
1024 1025
}

Y
TD-3409  
yihaoDeng 已提交
1026 1027 1028 1029
static void doRpcReportBrokenLinkToServer(void *param, void *id) {
   SRpcMsg *pRpcMsg = (SRpcMsg *)(param); 
   SRpcConn *pConn  = (SRpcConn *)(pRpcMsg->handle);
   SRpcInfo *pRpc   = pConn->pRpc; 
S
Shengliang Guan 已提交
1030 1031
  (*(pRpc->cfp))(pRpc->parent, pRpcMsg, NULL);
  free(pRpcMsg);
Y
TD-3409  
yihaoDeng 已提交
1032
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1033 1034
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1035
  if (pConn->pReqMsg == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1036 1037

  // if there are pending request, notify the app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1038
  rpcAddRef(pRpc);
1039
  tDebug("%s, notify the server app, connection is gone", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1040

Y
TD-3409  
yihaoDeng 已提交
1041 1042 1043 1044 1045 1046 1047
  SRpcMsg *rpcMsg = malloc(sizeof(SRpcMsg));
  rpcMsg->pCont = pConn->pReqMsg;     // pReqMsg is re-used to store the APP context from server
  rpcMsg->contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length
  rpcMsg->ahandle = pConn->ahandle;
  rpcMsg->handle = pConn;
  rpcMsg->msgType = pConn->inType;
  rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1048 1049
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
Y
TD-3409  
yihaoDeng 已提交
1050 1051 1052 1053 1054
  if (pRpc->cfp) {
    taosTmrStart(doRpcReportBrokenLinkToServer, 0, rpcMsg, pRpc->tmrCtrl);
  } else {
    free(rpcMsg);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1055 1056
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1057
static void rpcProcessBrokenLink(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1058
  if (pConn == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1059
  SRpcInfo *pRpc = pConn->pRpc;
1060
  tDebug("%s, link is broken", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1061 1062

  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1063

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1064 1065
  if (pConn->outType) {
    SRpcReqContext *pContext = pConn->pContext;
1066
    pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1067
    pContext->pConn = NULL;
1068
    pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1069 1070
    taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
  }
Y
TD-3409  
yihaoDeng 已提交
1071
   
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1072
  if (pConn->inType) rpcReportBrokenLinkToServer(pConn); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1073

1074
  rpcReleaseConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1075
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1076 1077
}

1078 1079 1080 1081
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
  SRpcHead  *pHead = (SRpcHead *)pRecv->msg;
  SRpcInfo  *pRpc = (SRpcInfo *)pRecv->shandle;
  SRpcConn  *pConn = (SRpcConn *)pRecv->thandle;
H
hzcheng 已提交
1082

S
Shengliang Guan 已提交
1083
  tDump(pRecv->msg, pRecv->msgLen);
1084 1085

  // underlying UDP layer does not know it is server or client
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1086
  pRecv->connType = pRecv->connType | pRpc->connType;  
H
hzcheng 已提交
1087

B
Bomin Zhang 已提交
1088
  if (pRecv->msg == NULL) {
1089
    rpcProcessBrokenLink(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1090 1091 1092
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1093
  terrno = 0;
S
slguan 已提交
1094 1095
  SRpcReqContext *pContext;
  pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
H
hzcheng 已提交
1096

1097
  if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) {
1098
    tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
H
Hongze Cheng 已提交
1099
           pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), pRecv->ip, pRecv->port, terrno, pRecv->msgLen,
1100 1101 1102 1103 1104 1105
           pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
  } else {
    tDebug("%s %p %p, %d received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
           pConn, (void *)pHead->ahandle, pHead->msgType, pRecv->ip, pRecv->port, terrno, pRecv->msgLen,
           pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
  }
H
hzcheng 已提交
1106

H
TD-34  
hzcheng 已提交
1107
  int32_t code = terrno;
1108
  if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) {
H
TD-34  
hzcheng 已提交
1109
    if (code != 0) { // parsing error
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1110
      if (rpcIsReq(pHead->msgType)) {
H
TD-34  
hzcheng 已提交
1111
        rpcSendErrorMsgToPeer(pRecv, code);
B
Bomin Zhang 已提交
1112 1113 1114
        if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE) {
          rpcCloseConn(pConn);
        }
1115
        if (TMSG_INDEX(pHead->msgType) + 1 > 1 && TMSG_INDEX(pHead->msgType) + 1 < TDMT_MAX) {
H
Hongze Cheng 已提交
1116
          tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType+1), code);
Y
yihaoDeng 已提交
1117
        } else {
H
Hongze Cheng 已提交
1118
          tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), code);
1119
        }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1120
      } 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1121
    } else { // msg is passed to app only parsing is ok 
S
slguan 已提交
1122
      rpcProcessIncomingMsg(pConn, pHead, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1123
    }
H
hzcheng 已提交
1124 1125
  }

1126
  if (code) rpcFreeMsg(pRecv->msg); // parsing failed, msg shall be freed
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1127 1128
  return pConn;
}
H
hzcheng 已提交
1129

1130 1131 1132
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
  SRpcInfo       *pRpc = pContext->pRpc;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1133
  pContext->pConn = NULL;
1134 1135
  if (pContext->pRsp) { 
    // for synchronous API
S
Shengliang Guan 已提交
1136
    memcpy(pContext->pSet, &pContext->epSet, sizeof(SEpSet));
1137
    memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1138
    tsem_post(pContext->pSem);
1139 1140
  } else {
    // for asynchronous API 
S
Shengliang Guan 已提交
1141
    SEpSet *pEpSet = NULL;
1142
    if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) 
S
Shengliang Guan 已提交
1143
      pEpSet = &pContext->epSet;
1144

S
Shengliang Guan 已提交
1145
    (*pRpc->cfp)(pRpc->parent, pMsg, pEpSet);
1146 1147 1148
  }

  // free the request message
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1149
  taosRemoveRef(tsRpcRefId, pContext->rid); 
1150 1151
}

S
slguan 已提交
1152
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
S
slguan 已提交
1153

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1154
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1155
  SRpcMsg   rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1156

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1157
  pHead = rpcDecompressRpcMsg(pHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1158 1159 1160
  rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
  rpcMsg.pCont = pHead->content;
  rpcMsg.msgType = pHead->msgType;
S
Shengliang Guan 已提交
1161 1162 1163
  rpcMsg.code = pHead->code;

  if (rpcIsReq(pHead->msgType)) {
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1164
    rpcMsg.ahandle = pConn->ahandle;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1165 1166
    rpcMsg.handle = pConn;
    rpcAddRef(pRpc);  // add the refCount for requests
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1167

陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1168
    // notify the server app
S
Shengliang Guan 已提交
1169
    (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
H
hzcheng 已提交
1170
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1171
    // it's a response
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1172
    rpcMsg.handle = pContext;
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1173
    rpcMsg.ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1174
    pContext->pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1175

1176
    // for UDP, port may be changed by server, the port in epSet shall be used for cache
陶建辉(Jeff)'s avatar
TD-1632  
陶建辉(Jeff) 已提交
1177 1178 1179
    if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
      rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType);    
    } else {
1180 1181
      rpcCloseConn(pConn);
    }
H
hzcheng 已提交
1182

1183
    if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1184
      pContext->numOfTry = 0;
S
Shengliang Guan 已提交
1185
      SEpSet *pEpSet = (SEpSet*)pHead->content;
1186 1187
      if (pEpSet->numOfEps > 0) {
        memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
S
TD-1670  
Shengliang Guan 已提交
1188 1189 1190
        tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,
               pContext->epSet.inUse);
        for (int i = 0; i < pContext->epSet.numOfEps; ++i) {
1191
          pContext->epSet.port[i] = htons(pContext->epSet.port[i]);
S
TD-1670  
Shengliang Guan 已提交
1192 1193 1194
          tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.fqdn[i],
                 pContext->epSet.port[i]);
        }
1195
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1196
      rpcSendReqToServer(pRpc, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1197
      rpcFreeCont(rpcMsg.pCont);
1198
    } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || pHead->code == TSDB_CODE_DND_EXITING) {
1199 1200
      pContext->code = pHead->code;
      rpcProcessConnError(pContext, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1201
      rpcFreeCont(rpcMsg.pCont);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1202
    } else {
1203
      rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1204 1205 1206 1207
    }
  }
}

1208
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1209 1210
  char       msg[RPC_MSG_OVERHEAD];
  SRpcHead  *pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1211

1212
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1213 1214 1215 1216
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
  pHead->msgType = pConn->inType+1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1217
  pHead->spi = pConn->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1218 1219 1220 1221
  pHead->encrypt = 0;
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
1222
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1223
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1224
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1225
  pHead->code = htonl(code);
H
hzcheng 已提交
1226

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239
  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
  pConn->secured = 1; // connection shall be secured
}

static void rpcSendReqHead(SRpcConn *pConn) {
  char       msg[RPC_MSG_OVERHEAD];
  SRpcHead  *pHead;

  // set msg header
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
  pHead->msgType = pConn->outType;
1240
  pHead->msgVer = htonl(tsVersion >> 8);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1241 1242 1243 1244 1245 1246
  pHead->spi = pConn->spi;
  pHead->encrypt = 0;
  pHead->tranId = pConn->outTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1247
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1248 1249 1250 1251
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
  pHead->code = 1;

  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1252
}
H
hjxilinx 已提交
1253

1254
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1255 1256
  SRpcHead  *pRecvHead, *pReplyHead;
  char       msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) ];
1257 1258
  uint32_t   timeStamp;
  int        msgLen;
H
hzcheng 已提交
1259

1260
  pRecvHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1261
  pReplyHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1262

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1263 1264
  memset(msg, 0, sizeof(SRpcHead));
  pReplyHead->version = pRecvHead->version;
1265
  pReplyHead->msgType = (tmsg_t)(pRecvHead->msgType + 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1266
  pReplyHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1267
  pReplyHead->encrypt = pRecvHead->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1268
  pReplyHead->tranId = pRecvHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1269
  pReplyHead->sourceId = pRecvHead->destId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1270
  pReplyHead->destId = pRecvHead->sourceId;
1271
  pReplyHead->linkUid = pRecvHead->linkUid;
1272
  pReplyHead->ahandle = pRecvHead->ahandle;
H
hzcheng 已提交
1273

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1274 1275
  pReplyHead->code = htonl(code);
  msgLen = sizeof(SRpcHead);
H
hzcheng 已提交
1276

1277
  if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP) {
1278
    // include a time stamp if client's time is not synchronized well
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1279
    uint8_t *pContent = pReplyHead->content;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1280
    timeStamp = htonl(taosGetTimestampSec());
1281 1282
    memcpy(pContent, &timeStamp, sizeof(timeStamp));
    msgLen += sizeof(timeStamp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1283
  }
H
hzcheng 已提交
1284

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1285
  pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1286
  (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle);
H
hzcheng 已提交
1287

1288
  return; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1289 1290
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1291
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1292 1293 1294
  SRpcHead  *pHead = rpcHeadFromCont(pContext->pCont);
  char      *msg = (char *)pHead;
  int        msgLen = rpcMsgLenFromCont(pContext->contLen);
1295
  tmsg_t     msgType = pContext->msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1296

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1297
  pContext->numOfTry++;
1298
  SRpcConn *pConn = rpcSetupConnToServer(pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1299 1300
  if (pConn == NULL) {
    pContext->code = terrno;
1301
    taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1302 1303 1304
    return;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1305
  pContext->pConn = pConn;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1306
  pConn->ahandle = pContext->ahandle;
1307
  rpcLockConn(pConn);
1308

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1309
  // set the message header  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1310
  pHead->version = 1;
1311
  pHead->msgVer = htonl(tsVersion >> 8);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1312 1313
  pHead->msgType = msgType;
  pHead->encrypt = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1314 1315
  pConn->tranId++;
  if ( pConn->tranId == 0 ) pConn->tranId++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1316 1317 1318 1319
  pHead->tranId = pConn->tranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->port = 0;
1320
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1321
  pHead->ahandle = (uint64_t)pConn->ahandle;
1322
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1323 1324 1325

  // set the connection parameters
  pConn->outType = msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1326
  pConn->outTranId = pHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1327 1328
  pConn->pReqMsg = msg;
  pConn->reqMsgLen = msgLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1329
  pConn->pContext = pContext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1330

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1331
  rpcSendMsgToPeer(pConn, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1332 1333
  if (pConn->connType != RPC_CONN_TCPC)
    taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1334 1335

  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1336
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1337 1338

static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1339 1340
  int        writtenLen = 0;
  SRpcHead  *pHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1341

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1342
  msgLen = rpcAddAuthPart(pConn, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1343

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1344
  if ( rpcIsReq(pHead->msgType)) {
1345
    tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d",
H
Hongze Cheng 已提交
1346
           pConn->info, TMSG_INFO(pHead->msgType), pConn->peerFqdn, pConn->peerPort, 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1347
           msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1348
  } else {
J
jtao1735 已提交
1349
    if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured
1350
    tDebug("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
H
Hongze Cheng 已提交
1351
           pConn->info, TMSG_INFO(pHead->msgType), pConn->peerIp, pConn->peerPort, 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1352
           htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1353
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1354

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1355
  //tTrace("connection type is: %d", pConn->connType);
1356
  writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1357

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1358
  if (writtenLen != msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1359
    tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
1360 1361
  }
 
S
Shengliang Guan 已提交
1362
  tDump(msg, msgLen);
H
hzcheng 已提交
1363 1364
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1365
static void rpcProcessConnError(void *param, void *id) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1366
  SRpcReqContext *pContext = (SRpcReqContext *)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1367 1368
  SRpcInfo       *pRpc = pContext->pRpc;
  SRpcMsg         rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1369
 
H
hjxilinx 已提交
1370 1371 1372 1373
  if (pRpc == NULL) {
    return;
  }
  
1374
  tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);
H
hzcheng 已提交
1375

H
Hongze Cheng 已提交
1376
  if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TDMT_VND_FETCH) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1377
    rpcMsg.msgType = pContext->msgType+1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1378
    rpcMsg.ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1379 1380 1381
    rpcMsg.code = pContext->code;
    rpcMsg.pCont = NULL;
    rpcMsg.contLen = 0;
1382 1383

    rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1384
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1385
    // move to next IP 
1386 1387
    pContext->epSet.inUse++;
    pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1388
    rpcSendReqToServer(pRpc, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1389
  }
H
hzcheng 已提交
1390 1391
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1392 1393 1394
static void rpcProcessRetryTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;
H
hzcheng 已提交
1395

1396
  rpcLockConn(pConn);
H
hzcheng 已提交
1397

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1398
  if (pConn->outType && pConn->user[0]) {
H
Hongze Cheng 已提交
1399
    tDebug("%s, expected %s is not received", pConn->info, TMSG_INFO((int)pConn->outType + 1));
H
hzcheng 已提交
1400 1401 1402
    pConn->pTimer = NULL;
    pConn->retry++;

S
slguan 已提交
1403
    if (pConn->retry < 4) {
H
Hongze Cheng 已提交
1404
      tDebug("%s, re-send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn, pConn->peerPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1405
      rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);      
1406
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
H
hzcheng 已提交
1407 1408
    } else {
      // close the connection
H
Hongze Cheng 已提交
1409
      tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn, pConn->peerPort);
1410 1411
      if (pConn->pContext) {
        pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1412
        pConn->pContext->pConn = NULL;
1413
        pConn->pReqMsg = NULL;
1414
        taosTmrStart(rpcProcessConnError, 1, pConn->pContext, pRpc->tmrCtrl);
1415 1416
        rpcReleaseConn(pConn);
      }
H
hzcheng 已提交
1417
    }
1418
  } else {
1419
    tDebug("%s, retry timer not processed", pConn->info);
H
hzcheng 已提交
1420 1421
  }

1422
  rpcUnlockConn(pConn);
H
hzcheng 已提交
1423 1424
}

1425 1426 1427
static void rpcProcessIdleTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;

1428 1429
  rpcLockConn(pConn);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1430
  if (pConn->user[0]) {
1431
    tDebug("%s, close the connection since no activity", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1432
    if (pConn->inType) rpcReportBrokenLinkToServer(pConn); 
1433
    rpcReleaseConn(pConn);
1434
  } else {
1435
    tDebug("%s, idle timer:%p not processed", pConn->info, tmrId);
1436
  }
1437 1438

  rpcUnlockConn(pConn);
1439 1440 1441 1442 1443 1444
}

static void rpcProcessProgressTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;

1445
  rpcLockConn(pConn);
1446

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1447
  if (pConn->inType && pConn->user[0]) {
1448
    tDebug("%s, progress timer expired, send progress", pConn->info);
1449
    rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
1450
    pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
1451
  } else {
1452
    tDebug("%s, progress timer:%p not processed", pConn->info, tmrId);
1453 1454
  }

1455
  rpcUnlockConn(pConn);
1456 1457 1458
}

static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1459 1460 1461
  SRpcHead  *pHead = rpcHeadFromCont(pCont);
  int32_t    finalLen = 0;
  int        overhead = sizeof(SRpcComp);
1462 1463 1464 1465 1466
  
  if (!NEEDTO_COMPRESSS_MSG(contLen)) {
    return contLen;
  }
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1467
  char *buf = malloc (contLen + overhead + 8);  // 8 extra bytes
1468
  if (buf == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1469
    tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
1470 1471 1472 1473
    return contLen;
  }
  
  int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
S
Shuduo Sang 已提交
1474
  tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead);
1475 1476 1477 1478 1479
  
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
S
TD-4100  
Shengliang Guan 已提交
1480
  if (compLen > 0 && compLen < contLen - overhead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1481 1482 1483
    SRpcComp *pComp = (SRpcComp *)pCont;
    pComp->reserved = 0; 
    pComp->contLen = htonl(contLen); 
1484 1485
    memcpy(pCont + overhead, buf, compLen);
    
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1486
    pHead->comp = 1;
S
Shuduo Sang 已提交
1487
    tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen);
1488 1489 1490 1491 1492 1493 1494 1495 1496
    finalLen = compLen + overhead;
  } else {
    finalLen = contLen;
  }

  free(buf);
  return finalLen;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1497
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1498
  int overhead = sizeof(SRpcComp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1499 1500 1501
  SRpcHead   *pNewHead = NULL;  
  uint8_t    *pCont = pHead->content;
  SRpcComp   *pComp = (SRpcComp *)pHead->content;
1502

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1503
  if (pHead->comp) {
1504
    // decompress the content
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1505 1506
    assert(pComp->reserved == 0);
    int contLen = htonl(pComp->contLen);
1507 1508
  
    // prepare the temporary buffer to decompress message
1509 1510
    char *temp = (char *)malloc(contLen + RPC_MSG_OVERHEAD);
    pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext
1511
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1512
    if (pNewHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1513
      int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1514 1515
      int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char *)pNewHead->content, compLen, contLen);
      assert(origLen == contLen);
1516
    
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1517
      memcpy(pNewHead, pHead, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1518
      pNewHead->msgLen = rpcMsgLenFromCont(origLen);
1519
      rpcFreeMsg(pHead); // free the compressed message buffer
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1520
      pHead = pNewHead; 
S
TD-1762  
Shengliang Guan 已提交
1521
      tTrace("decomp malloc mem:%p", temp);
1522
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1523
      tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
1524 1525 1526
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1527
  return pHead;
1528 1529
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1530
static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) {
dengyihao's avatar
dengyihao 已提交
1531
  T_MD5_CTX context;
H
hzcheng 已提交
1532 1533
  int     ret = -1;

dengyihao's avatar
dengyihao 已提交
1534
  tMD5Init(&context);
H
Haojun Liao 已提交
1535
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1536
  tMD5Update(&context, (uint8_t *)pMsg, msgLen);
H
Haojun Liao 已提交
1537
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1538
  tMD5Final(&context);
H
hzcheng 已提交
1539 1540 1541 1542 1543 1544

  if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;

  return ret;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1545
static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) {
dengyihao's avatar
dengyihao 已提交
1546
  T_MD5_CTX context;
H
hzcheng 已提交
1547

dengyihao's avatar
dengyihao 已提交
1548
  tMD5Init(&context);
H
Haojun Liao 已提交
1549
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1550
  tMD5Update(&context, (uint8_t *)pMsg, msgLen);
H
Haojun Liao 已提交
1551
  tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
dengyihao's avatar
dengyihao 已提交
1552
  tMD5Final(&context);
H
hzcheng 已提交
1553 1554 1555

  memcpy(pAuth, context.digest, sizeof(context.digest));
}
1556 1557

static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1558
  SRpcHead *pHead = (SRpcHead *)msg;
1559

1560
  if (pConn->spi && pConn->secured == 0) {
1561
    // add auth part
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1562
    pHead->spi = pConn->spi;
1563 1564 1565
    SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1566
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1567
    rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
1568
  } else {
1569
    pHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1570
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1571 1572 1573 1574 1575 1576
  }

  return msgLen;
}

static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1577
  SRpcHead *pHead = (SRpcHead *)msg;
1578
  int       code = 0;
1579

1580 1581
  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){
    // secured link, or no authentication 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1582
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1583
    // tTrace("%s, secured link, no auth is required", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1584 1585 1586 1587 1588 1589
    return 0;
  }

  if ( !rpcIsReq(pHead->msgType) ) {
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
1590
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
1591
        code == TSDB_CODE_RPC_INVALID_VERSION ||
S
Shengliang Guan 已提交
1592
        code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
1593
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1594
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1595
      return 0;
1596
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1597 1598 1599
  }
 
  code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1600
  if (pHead->spi == pConn->spi) {
1601
    // authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1602
    SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest));
1603 1604 1605 1606 1607

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1608
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
1609
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
1610
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1611
      if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
1612
        tDebug("%s, authentication failed, msg discarded", pConn->info);
1613
        code = TSDB_CODE_RPC_AUTH_FAILURE;
1614
      } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1615
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
1616
        if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1;  // link is secured for client
1617
        // tTrace("%s, message is authenticated", pConn->info);
1618 1619 1620
      }
    }
  } else {
1621
    tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
1622
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
1623 1624 1625 1626 1627
  }

  return code;
}

1628
static void rpcLockConn(SRpcConn *pConn) {
S
TD-2616  
Shengliang Guan 已提交
1629
  int64_t tid = taosGetSelfPthreadId();
1630
  int     i = 0;
1631 1632 1633 1634 1635 1636 1637 1638
  while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) {
    if (++i % 1000 == 0) {
      sched_yield();
    }
  }
}

static void rpcUnlockConn(SRpcConn *pConn) {
S
TD-2616  
Shengliang Guan 已提交
1639
  int64_t tid = taosGetSelfPthreadId();
1640 1641 1642 1643
  if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) {
    assert(false);
  }
}
1644

1645 1646
static void rpcAddRef(SRpcInfo *pRpc)
{  
1647
   atomic_add_fetch_32(&pRpc->refCount, 1);
1648 1649 1650 1651
}

static void rpcDecRef(SRpcInfo *pRpc)
{ 
1652
  if (atomic_sub_fetch_32(&pRpc->refCount, 1) == 0) {
1653
    rpcCloseConnCache(pRpc->pCache);
dengyihao's avatar
dengyihao 已提交
1654
    taosHashCleanup(pRpc->hash);
dengyihao's avatar
dengyihao 已提交
1655
    taosTmrCleanUp(pRpc->tmrCtrl);
dengyihao's avatar
dengyihao 已提交
1656
    taosIdPoolCleanUp(pRpc->idPool);
1657

S
TD-1848  
Shengliang Guan 已提交
1658
    tfree(pRpc->connList);
1659
    pthread_mutex_destroy(&pRpc->mutex);
1660
    tDebug("%s rpc resources are released", pRpc->label);
S
TD-1848  
Shengliang Guan 已提交
1661
    tfree(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1662

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1663
    atomic_sub_fetch_32(&tsRpcNum, 1);
1664 1665 1666
  }
}