rpcMain.c 47.1 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20 21 22
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
H
hjxilinx 已提交
23
#include "lz4.h"
S
slguan 已提交
24
#include "taoserror.h"
25
#include "tsocket.h"
S
slguan 已提交
26
#include "tglobal.h"
27
#include "taosmsg.h"
S
slguan 已提交
28 29 30
#include "trpc.h"
#include "hash.h"
#include "rpcLog.h"
31 32
#include "rpcUdp.h"
#include "rpcCache.h"
J
Jeff Tao 已提交
33
#include "rpcTcp.h"
34
#include "rpcHead.h"
H
hzcheng 已提交
35

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
36 37 38 39 40
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) 
#define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
41
#define rpcIsReq(type) (type & 1U)
H
hzcheng 已提交
42 43

typedef struct {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
44 45 46
  int      sessions;     // number of sessions allowed
  int      numOfThreads; // number of threads to process incoming messages
  int      idleTime;     // milliseconds;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47
  uint16_t localPort;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48 49
  int8_t   connType;
  int      index;        // for UDP server only, round robin for multiple threads
50
  char     label[TSDB_LABEL_LEN];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
51

52 53 54
  char     user[TSDB_UNI_LEN];   // meter ID
  char     spi;                  // security parameter index
  char     encrypt;              // encrypt algorithm
55 56
  char     secret[TSDB_KEY_LEN]; // secret for the link
  char     ckey[TSDB_KEY_LEN];   // ciphering key 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57

J
jtao1735 已提交
58
  void   (*cfp)(SRpcMsg *, SRpcIpSet *);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
59
  int    (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
60

61
  int      refCount;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
62 63
  void     *idPool;   // handle to ID pool
  void     *tmrCtrl;  // handle to timer
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
64
  SHashObj *hash;     // handle returned by hash utility
65 66
  void     *tcphandle;// returned handle from TCP initialization
  void     *udphandle;// returned handle from UDP initialization
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67
  void     *pCache;   // connection cache
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68
  pthread_mutex_t  mutex;
J
Jeff Tao 已提交
69
  struct SRpcConn *connList;  // connection list
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
70 71 72
} SRpcInfo;

typedef struct {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
73
  SRpcInfo *pRpc;       // associated SRpcInfo
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
74 75
  SRpcIpSet ipSet;      // ip list provided by app
  void     *ahandle;    // handle provided by app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76
  struct SRpcConn *pConn; // pConn allocated
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
77
  char      msgType;    // message type
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
78 79
  uint8_t  *pCont;      // content provided by app
  int32_t   contLen;    // content length
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
80
  int32_t   code;       // error code
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
81
  int16_t   numOfTry;   // number of try for different servers
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
82
  int8_t    oldInUse;   // server IP inUse passed by app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
83
  int8_t    redirect;   // flag to indicate redirect
84
  int8_t    connType;   // connection type
85 86 87
  SRpcMsg  *pRsp;       // for synchronous API
  tsem_t   *pSem;       // for synchronous API
  SRpcIpSet *pSet;      // for synchronous API 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
88
  char      msg[0];     // RpcHead starts from here
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
89 90
} SRpcReqContext;

J
Jeff Tao 已提交
91
typedef struct SRpcConn {
92
  char      info[48];// debug info: label + pConn + ahandle
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
93 94 95
  int       sid;     // session ID
  uint32_t  ownId;   // own link ID
  uint32_t  peerId;  // peer link ID
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96
  char      user[TSDB_UNI_LEN]; // user ID for the link
97 98
  char      spi;     // security parameter index
  char      encrypt; // encryption, 0:1 
99 100
  char      secret[TSDB_KEY_LEN]; // secret for the link
  char      ckey[TSDB_KEY_LEN];   // ciphering key 
101
  char      secured;              // if set to 1, no authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102
  uint16_t  localPort;      // for UDP only
103
  uint32_t  linkUid;        // connection unique ID assigned by client
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104 105
  uint32_t  peerIp;         // peer IP
  uint16_t  peerPort;       // peer port
J
jtao1735 已提交
106
  char      peerFqdn[TSDB_FQDN_LEN]; // peer FQDN or ip string
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
107 108
  uint16_t  tranId;         // outgoing transcation ID, for build message
  uint16_t  outTranId;      // outgoing transcation ID
109 110 111
  uint16_t  inTranId;       // transcation ID for incoming msg
  uint8_t   outType;        // message type for outgoing request
  char      inType;         // message type for incoming request  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
112 113
  void     *chandle;  // handle passed by TCP/UDP connection layer
  void     *ahandle;  // handle provided by upper app layter
114
  int       retry;    // number of retry for sending request
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
115
  int       tretry;   // total retry
116 117 118 119 120 121 122
  void     *pTimer;   // retry timer to monitor the response
  void     *pIdleTimer; // idle timer
  char     *pRspMsg;    // response message including header
  int       rspMsgLen;  // response messag length
  char     *pReqMsg;    // request message including header
  int       reqMsgLen;  // request message length
  SRpcInfo *pRpc;       // the associated SRpcInfo
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
123
  int8_t    connType;   // connection type
124
  int64_t   lockedBy;   // lock for connection
125
  SRpcReqContext *pContext; // request context
H
hzcheng 已提交
126 127
} SRpcConn;

S
slguan 已提交
128
int tsRpcMaxUdpSize = 15000;  // bytes
129
int tsProgressTimer = 100;
H
hzcheng 已提交
130 131 132
// not configurable
int tsRpcMaxRetry;
int tsRpcHeadSize;
133
int tsRpcOverhead;
H
hzcheng 已提交
134

135 136 137 138 139 140 141
// server:0 client:1  tcp:2 udp:0
#define RPC_CONN_UDPS   0
#define RPC_CONN_UDPC   1
#define RPC_CONN_TCPS   2
#define RPC_CONN_TCPC   3
#define RPC_CONN_TCP    2

J
jtao1735 已提交
142
void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
143 144
    taosInitUdpConnection,
    taosInitUdpConnection,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
145 146 147
    taosInitTcpServer, 
    taosInitTcpClient
};
H
hzcheng 已提交
148

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
149 150 151 152 153 154
void (*taosCleanUpConn[])(void *thandle) = {
    taosCleanUpUdpConnection, 
    taosCleanUpUdpConnection, 
    taosCleanUpTcpServer,
    taosCleanUpTcpClient
};
H
hzcheng 已提交
155

156
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
157 158
    taosSendUdpData, 
    taosSendUdpData, 
J
Jeff Tao 已提交
159 160
    taosSendTcpData, 
    taosSendTcpData
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161
};
H
hzcheng 已提交
162

J
jtao1735 已提交
163
void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = {
S
slguan 已提交
164 165 166 167
    taosOpenUdpConnection,
    taosOpenUdpConnection,
    NULL,
    taosOpenTcpClientConnection,
H
hzcheng 已提交
168 169
};

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
170 171 172
void (*taosCloseConn[])(void *chandle) = {
    NULL, 
    NULL, 
J
Jeff Tao 已提交
173 174
    taosCloseTcpConnection, 
    taosCloseTcpConnection
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
175 176
};

J
jtao1735 已提交
177
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178
static void      rpcCloseConn(void *thandle);
179
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext);
180
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
181 182
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
H
hzcheng 已提交
183

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
184
static void  rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
185
static void  rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
186
static void  rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187
static void  rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
188
static void  rpcSendReqHead(SRpcConn *pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189

190
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
191
static void  rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead);
192 193 194
static void  rpcProcessConnError(void *param, void *id);
static void  rpcProcessRetryTimer(void *, void *);
static void  rpcProcessIdleTimer(void *param, void *tmrId);
195
static void  rpcProcessProgressTimer(void *param, void *tmrId);
H
hzcheng 已提交
196

197
static void  rpcFreeMsg(void *msg);
198
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
199
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
200 201
static int   rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
static int   rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
202 203
static void  rpcLockConn(SRpcConn *pConn);
static void  rpcUnlockConn(SRpcConn *pConn);
204 205
static void  rpcAddRef(SRpcInfo *pRpc);
static void  rpcDecRef(SRpcInfo *pRpc);
H
hzcheng 已提交
206

J
Jeff Tao 已提交
207
void *rpcOpen(const SRpcInit *pInit) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
208
  SRpcInfo *pRpc;
H
hzcheng 已提交
209

210 211
  tsProgressTimer = tsRpcTimer/2; 
  tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
212
  tsRpcHeadSize = RPC_MSG_OVERHEAD; 
213
  tsRpcOverhead = sizeof(SRpcReqContext);
H
hzcheng 已提交
214

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
215 216
  pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
  if (pRpc == NULL) return NULL;
H
hzcheng 已提交
217

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218
  if(pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
219
  pRpc->connType = pInit->connType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
220
  pRpc->idleTime = pInit->idleTime;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
221
  pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222 223
  pRpc->localPort = pInit->localPort;
  pRpc->afp = pInit->afp;
J
jtao1735 已提交
224
  pRpc->sessions = pInit->sessions+1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
225 226 227
  if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
  if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret));
  if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
228
  pRpc->spi = pInit->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229
  pRpc->cfp = pInit->cfp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
230
  pRpc->afp = pInit->afp;
231
  pRpc->refCount = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
232

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
233
  size_t size = sizeof(SRpcConn) * pRpc->sessions;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
234 235
  pRpc->connList = (SRpcConn *)calloc(1, size);
  if (pRpc->connList == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236
    tError("%s failed to allocate memory for taos connections, size:%ld", pRpc->label, size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
238 239
    return NULL;
  }
H
hzcheng 已提交
240

J
jtao1735 已提交
241
  pRpc->idPool = taosInitIdPool(pRpc->sessions-1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
242 243
  if (pRpc->idPool == NULL) {
    tError("%s failed to init ID pool", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
245
    return NULL;
H
hzcheng 已提交
246
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
247

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
248
  pRpc->tmrCtrl = taosTmrInit(pRpc->sessions*2 + 1, 50, 10000, pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
249 250
  if (pRpc->tmrCtrl == NULL) {
    tError("%s failed to init timers", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
251
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252 253
    return NULL;
  }
H
hzcheng 已提交
254

255
  if (pRpc->connType == TAOS_CONN_SERVER) {
J
jtao1735 已提交
256
    pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
257 258 259 260 261 262
    if (pRpc->hash == NULL) {
      tError("%s failed to init string hash", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
  } else {
263
    pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime); 
264 265 266 267 268
    if ( pRpc->pCache == NULL ) {
      tError("%s failed to init connection cache", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
269
  }
H
hzcheng 已提交
270

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271 272
  pthread_mutex_init(&pRpc->mutex, NULL);

273 274 275 276 277 278 279 280 281 282 283
  pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, 
                                                  pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
  pRpc->udphandle = (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, 
                                                  pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);

  if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) {
    tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort);
    rpcClose(pRpc);
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
284
  tTrace("%s rpc is openned, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions);
H
hzcheng 已提交
285

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
286
  return pRpc;
H
hzcheng 已提交
287 288
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289 290
void rpcClose(void *param) {
  SRpcInfo *pRpc = (SRpcInfo *)param;
H
hzcheng 已提交
291

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
292
  for (int i = 0; i < pRpc->sessions; ++i) {
293
    if (pRpc->connList && pRpc->connList[i].user[0]) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
294
      rpcCloseConn((void *)(pRpc->connList + i));
H
hzcheng 已提交
295 296 297
    }
  }

J
Jeff Tao 已提交
298 299 300
  (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
  (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
301
  tTrace("%s rpc is closed", pRpc->label);
302
  rpcDecRef(pRpc);
H
hzcheng 已提交
303 304
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
305 306
void *rpcMallocCont(int contLen) {
  int size = contLen + RPC_MSG_OVERHEAD;
307

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
308 309
  char *start = (char *)calloc(1, (size_t)size);
  if (start == NULL) {
310 311 312 313
    tError("failed to malloc msg, size:%d", size);
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
314
  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
315 316 317
}

void rpcFreeCont(void *cont) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
318
  if ( cont ) {
319 320
    char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext);
    free(temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
321
  }
322 323
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
324 325 326 327 328 329
void *rpcReallocCont(void *ptr, int contLen) {
  if (ptr == NULL) return rpcMallocCont(contLen);

  char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead);
  if (contLen == 0 ) {
    free(start); 
J
Jeff Tao 已提交
330
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
331 332 333 334 335 336 337 338 339 340 341 342
  }

  int size = contLen + RPC_MSG_OVERHEAD;
  start = realloc(start, size);
  if (start == NULL) {
    tError("failed to realloc cont, size:%d", size);
    return NULL;
  } 

  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
343
void *rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) {
344 345 346
  SRpcInfo       *pRpc = (SRpcInfo *)shandle;
  SRpcReqContext *pContext;

J
Jeff Tao 已提交
347
  int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
348 349
  pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
  pContext->ahandle = pMsg->handle;
350
  pContext->pRpc = (SRpcInfo *)shandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
351
  pContext->ipSet = *pIpSet;
J
Jeff Tao 已提交
352
  pContext->contLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
353 354
  pContext->pCont = pMsg->pCont;
  pContext->msgType = pMsg->msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
355
  pContext->oldInUse = pIpSet->inUse;
356

357
  pContext->connType = RPC_CONN_UDPC; 
J
Jeff Tao 已提交
358
  if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC;
359 360 361

  // connection type is application specific. 
  // for TDengine, all the query, show commands shall have TCP connection
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
362
  char type = pMsg->msgType;
B
Bomin Zhang 已提交
363 364 365 366
  if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE
    || type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP
    || type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_TABLE_META
    || type == TSDB_MSG_TYPE_CM_SHOW )
367 368
    pContext->connType = RPC_CONN_TCPC;
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
369
  rpcSendReqToServer(pRpc, pContext);
370

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
371
  return pContext;
372 373
}

J
Jeff Tao 已提交
374
void rpcSendResponse(const SRpcMsg *pRsp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
375
  int        msgLen = 0;
J
Jeff Tao 已提交
376 377 378
  SRpcConn  *pConn = (SRpcConn *)pRsp->handle;
  SRpcMsg    rpcMsg = *pRsp;
  SRpcMsg   *pMsg = &rpcMsg;
379
  SRpcInfo  *pRpc = pConn->pRpc;
J
Jeff Tao 已提交
380

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
381 382 383
  if ( pMsg->pCont == NULL ) {
    pMsg->pCont = rpcMallocCont(0);
    pMsg->contLen = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
384 385
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
386
  SRpcHead  *pHead = rpcHeadFromCont(pMsg->pCont);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
387 388
  char      *msg = (char *)pHead;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
389 390
  pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
  msgLen = rpcMsgLenFromCont(pMsg->contLen);
391

392
  rpcLockConn(pConn);
393

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
394
  if ( pConn->inType == 0 || pConn->user[0] == 0 ) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
395
    tTrace("%s, connection is already released, rsp wont be sent", pConn->info);
396
    rpcUnlockConn(pConn);
397 398 399
    return;
  }

400
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
401 402
  pHead->version = 1;
  pHead->msgType = pConn->inType+1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
403 404
  pHead->spi = pConn->spi;
  pHead->encrypt = pConn->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
405 406 407
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
408
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
409
  pHead->port = htons(pConn->localPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
410
  pHead->code = htonl(pMsg->code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
411 412
  pHead->ahandle = (uint64_t) pConn->ahandle;
 
413 414
  // set pConn parameters
  pConn->inType = 0;
415 416

  // response message is released until new response is sent
417
  rpcFreeMsg(pConn->pRspMsg); 
418 419
  pConn->pRspMsg = msg;
  pConn->rspMsgLen = msgLen;
420
  if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--;
421

422
  taosTmrStopA(&pConn->pTimer);
423 424 425

  // set the idle timer to monitor the activity
  taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
426
  rpcSendMsgToPeer(pConn, msg, msgLen);
427
  pConn->secured = 1; // connection shall be secured
428 429

  if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
430 431
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
432

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
433
  rpcUnlockConn(pConn);
434
  rpcDecRef(pRpc);    // decrease the referene count
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
435

436 437 438
  return;
}

J
Jeff Tao 已提交
439
void rpcSendRedirectRsp(void *thandle, const SRpcIpSet *pIpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
440 441
  SRpcMsg  rpcMsg; 
  memset(&rpcMsg, 0, sizeof(rpcMsg));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
442 443 444 445
  
  rpcMsg.contLen = sizeof(SRpcIpSet);
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  if (rpcMsg.pCont == NULL) return;
446

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
447
  memcpy(rpcMsg.pCont, pIpSet, sizeof(SRpcIpSet));
448

449
  rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
450
  rpcMsg.handle = thandle;
451

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
452
  rpcSendResponse(&rpcMsg);
453 454 455 456

  return;
}

457
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
458
  SRpcConn  *pConn = (SRpcConn *)thandle;
459
  if (pConn->user[0] == 0) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
460

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
461 462
  pInfo->clientIp = pConn->peerIp;
  pInfo->clientPort = pConn->peerPort;
J
jtao1735 已提交
463
  // pInfo->serverIp = pConn->destIp;
S
slguan 已提交
464

B
Bomin Zhang 已提交
465
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
466
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
467 468
}

J
Jeff Tao 已提交
469
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pMsg, SRpcMsg *pRsp) {
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
  SRpcReqContext *pContext;
  pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));

  memset(pRsp, 0, sizeof(SRpcMsg));
  
  tsem_t   sem;        
  tsem_init(&sem, 0, 0);
  pContext->pSem = &sem;
  pContext->pRsp = pRsp;
  pContext->pSet = pIpSet;

  rpcSendRequest(shandle, pIpSet, pMsg);

  tsem_wait(&sem);
  tsem_destroy(&sem);

  return;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
489
// this API is used by server app to keep an APP context in case connection is broken
490
int rpcReportProgress(void *handle, char *pCont, int contLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
491
  SRpcConn *pConn = (SRpcConn *)handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
492
  int code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
493

494
  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
495

496 497 498 499
  if (pConn->user[0]) {
    // pReqMsg and reqMsgLen is re-used to store the context from app server
    pConn->pReqMsg = pCont;     
    pConn->reqMsgLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
500 501 502 503 504
  } else {
    tTrace("%s, rpc connection is already released", pConn->info);
    rpcFreeCont(pCont);
    code = -1;
  }
505

506
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
507
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
508 509
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
510 511 512 513 514 515
/* todo: cancel process may have race condition, pContext may have been released 
   just before app calls the rpcCancelRequest */
void rpcCancelRequest(void *handle) {
  SRpcReqContext *pContext = handle;

  if (pContext->pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
516
    tTrace("%s, app trys to cancel request", pContext->pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
517 518 519 520 521 522
    rpcCloseConn(pContext->pConn);
    pContext->pConn = NULL;
    rpcFreeCont(pContext->pCont);
  }
}

523 524 525 526 527 528 529
static void rpcFreeMsg(void *msg) {
  if ( msg ) {
    char *temp = (char *)msg - sizeof(SRpcReqContext);
    free(temp);
  }
}

J
jtao1735 已提交
530
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
531
  SRpcConn *pConn;
S
slguan 已提交
532

J
jtao1735 已提交
533
  uint32_t peerIp = taosGetIpFromFqdn(peerFqdn);
534
  if (peerIp == 0xFFFFFFFF) {
J
jtao1735 已提交
535
    tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); 
536
    terrno = TSDB_CODE_RPC_APP_ERROR; 
J
jtao1735 已提交
537 538 539
    return NULL;
  }

540
  pConn = rpcAllocateClientConn(pRpc); 
H
hzcheng 已提交
541

542
  if (pConn) { 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
543
    tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn));
J
jtao1735 已提交
544
    pConn->peerIp = peerIp;
545
    pConn->peerPort = peerPort;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
546
    tstrncpy(pConn->user, pRpc->user, sizeof(pConn->user));
547
    pConn->connType = connType;
548

549 550
    if (taosOpenConn[connType]) {
      void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle;
J
jtao1735 已提交
551
      pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
552
      if (pConn->chandle == NULL) {
553
        terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
554 555 556
        rpcCloseConn(pConn);
        pConn = NULL;
      }
H
hzcheng 已提交
557 558 559
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
560
  return pConn;
H
hzcheng 已提交
561 562
}

563
static void rpcReleaseConn(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
564
  SRpcInfo *pRpc = pConn->pRpc;
565
  if (pConn->user[0] == 0) return;
H
hzcheng 已提交
566

567 568
  pConn->user[0] = 0;
  if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle);
H
hzcheng 已提交
569

570 571 572 573 574
  taosTmrStopA(&pConn->pTimer);
  taosTmrStopA(&pConn->pIdleTimer);

  if ( pRpc->connType == TAOS_CONN_SERVER) {
    char hashstr[40] = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
575
    size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
J
jtao1735 已提交
576
    taosHashRemove(pRpc->hash, hashstr, size);
577
    rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
578
  } 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
579 580
  
  int sid = pConn->sid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
581 582 583
  int64_t lockedBy = pConn->lockedBy; 
  memset(pConn, 0, sizeof(SRpcConn));
  pConn->lockedBy = lockedBy;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
584
  taosFreeId(pRpc->idPool, sid);
585

586 587 588 589 590 591 592 593
  tTrace("%s, rpc connection is released", pConn->info);
}

static void rpcCloseConn(void *thandle) {
  SRpcConn *pConn = (SRpcConn *)thandle;

  rpcLockConn(pConn);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
594 595
  if (pConn->user[0])
    rpcReleaseConn(pConn);
596

597
  rpcUnlockConn(pConn);
H
hzcheng 已提交
598 599
}

600 601
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
602

603 604 605
  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
606
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
607
  } else {
608 609 610 611 612 613
    pConn = pRpc->connList + sid;

    pConn->pRpc = pRpc;
    pConn->sid = sid;
    pConn->tranId = (uint16_t)(rand() & 0xFFFF);
    pConn->ownId = htonl(pConn->sid);
614
    pConn->linkUid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
615 616 617 618
    pConn->spi = pRpc->spi;
    pConn->encrypt = pRpc->encrypt;
    if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN);
  }
H
hzcheng 已提交
619

620 621
  return pConn;
}
H
hzcheng 已提交
622

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
623
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
624
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
625
  char      hashstr[40] = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
626 627
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
628
  size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
629
 
630
  // check if it is already allocated
J
jtao1735 已提交
631
  SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
632
  if (ppConn) pConn = *ppConn;
633 634 635 636
  if (pConn) {
    pConn->secured = 0;
    return pConn;
  }
637 638 639 640

  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
641
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
642 643
  } else {
    pConn = pRpc->connList + sid;
H
hzcheng 已提交
644
    memset(pConn, 0, sizeof(SRpcConn));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
645
    memcpy(pConn->user, pHead->user, tListLen(pConn->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
646
    pConn->pRpc = pRpc;
H
hzcheng 已提交
647 648
    pConn->sid = sid;
    pConn->tranId = (uint16_t)(rand() & 0xFFFF);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
649
    pConn->ownId = htonl(pConn->sid);
650
    pConn->linkUid = pHead->linkUid;
651
    if (pRpc->afp) {
652
      if (pConn->user[0] == 0) {
653
        terrno = TSDB_CODE_RPC_AUTH_REQUIRED;
654 655 656 657
      } else {
        terrno = (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
      }

658 659 660 661
      if (terrno != 0) {
        taosFreeId(pRpc->idPool, sid);   // sid shall be released
        pConn = NULL;
      }
H
hzcheng 已提交
662
    }
663
  }      
H
hzcheng 已提交
664

665
  if (pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
666 667 668 669 670
    if (pRecv->connType == RPC_CONN_UDPS && pRpc->numOfThreads > 1) {
      // UDP server, assign to new connection
      pRpc->index = (pRpc->index+1) % pRpc->numOfThreads;
      pConn->localPort = (pRpc->localPort + pRpc->index);
    }
H
hjxilinx 已提交
671
  
J
jtao1735 已提交
672
    taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
673 674 675 676 677
  }

  return pConn;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
678
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
679
  SRpcConn *pConn = NULL;  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
680
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
681 682 683

  if (sid) {
    pConn = pRpc->connList + sid;
684
    if (pConn->user[0] == 0) pConn = NULL;
685 686
  } 

687 688 689 690
  if (pConn == NULL) { 
    if (pRpc->connType == TAOS_CONN_SERVER) {
      pConn = rpcAllocateServerConn(pRpc, pRecv);
    } else {
691
      terrno = TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
692 693
    }
  }
694

695
  if (pConn) {
696
    if (pConn->linkUid != pHead->linkUid) {
697
      terrno = TSDB_CODE_RPC_MISMATCHED_LINK_ID;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
698
      tError("%s %p %p, linkUid:0x%x is not matched with received:0x%x", pRpc->label, pConn, pHead->ahandle, pConn->linkUid, pHead->linkUid);
699
      pConn = NULL;
H
hzcheng 已提交
700 701 702
    }
  }

703
  return pConn;
H
hzcheng 已提交
704 705
}

706
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
707 708 709
  SRpcConn   *pConn;
  SRpcInfo   *pRpc = pContext->pRpc;
  SRpcIpSet  *pIpSet = &pContext->ipSet;
H
hzcheng 已提交
710

J
jtao1735 已提交
711
  pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType);
712
  if ( pConn == NULL || pConn->user[0] == 0) {
J
jtao1735 已提交
713
    pConn = rpcOpenConn(pRpc, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
714 715 716
  } 

  if (pConn) {
S
Shengliang Guan 已提交
717
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
718
    pConn->ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
719
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
720
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
721
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
722
    tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
723
  }
H
hzcheng 已提交
724

725
  return pConn;
H
hzcheng 已提交
726 727
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
728
static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
H
hzcheng 已提交
729

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
730
    if (pConn->peerId == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
731
      pConn->peerId = pHead->sourceId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
732
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
733
      if (pConn->peerId != pHead->sourceId) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
734
        tTrace("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
735
               pConn->peerId, pHead->sourceId);
736
        return TSDB_CODE_RPC_INVALID_VALUE;
H
hzcheng 已提交
737 738 739
      }
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
740 741
    if (pConn->inTranId == pHead->tranId) {
      if (pConn->inType == pHead->msgType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
742
        if (pHead->code == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
743
          tTrace("%s, %s is retransmitted", pConn->info, taosMsg[pHead->msgType]);
744
          rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
745 746 747
        } else {
          // do nothing, it is heart beat from client
        }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
748
      } else if (pConn->inType == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
749
        tTrace("%s, %s is already processed, tranId:%d", pConn->info, taosMsg[pHead->msgType], pConn->inTranId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
750
        rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
751
      } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
752
        tTrace("%s, mismatched message %s and tranId", pConn->info, taosMsg[pHead->msgType]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
753
      }
H
hzcheng 已提交
754

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
755
      // do not reply any message
756
      return TSDB_CODE_RPC_ALREADY_PROCESSED;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
757
    }
H
hzcheng 已提交
758

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
759
    if (pConn->inType != 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
760
      tTrace("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
761
              pConn->inTranId, pHead->tranId);
762
      return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
763
    }
H
hzcheng 已提交
764

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
765 766
    pConn->inTranId = pHead->tranId;
    pConn->inType = pHead->msgType;
H
hzcheng 已提交
767

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
768
    return 0;
H
hzcheng 已提交
769 770
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
771
static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
772
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
773
  pConn->peerId = pHead->sourceId;
H
hzcheng 已提交
774

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
775
  if (pConn->outType == 0 || pConn->pContext == NULL) {
776
    return TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
777
  }
H
hzcheng 已提交
778

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
779
  if (pHead->tranId != pConn->outTranId) {
780
    return TSDB_CODE_RPC_INVALID_TRAN_ID;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
781
  }
H
hzcheng 已提交
782

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
783
  if (pHead->msgType != pConn->outType + 1) {
784
    return TSDB_CODE_RPC_INVALID_RESPONSE_TYPE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
785
  }
H
hzcheng 已提交
786

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
787 788 789
  taosTmrStopA(&pConn->pTimer);
  pConn->retry = 0;

790
  if (pHead->code == TSDB_CODE_RPC_AUTH_REQUIRED && pRpc->spi) {
791 792 793 794
    tTrace("%s, authentication shall be restarted", pConn->info);
    pConn->secured = 0;
    rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);      
    pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
795
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
796 797
  }

798
  if (pHead->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
799
    if (pConn->tretry <= tsRpcMaxRetry) {
800
      tTrace("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
801 802
      pConn->tretry++;
      rpcSendReqHead(pConn);
803
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
804
      return TSDB_CODE_RPC_ALREADY_PROCESSED;
H
hzcheng 已提交
805
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
806
      // peer still in processing, give up
807
      tTrace("%s, server processing takes too long time, give up", pConn->info);
808
      pHead->code = TSDB_CODE_RPC_TOO_SLOW;
H
hzcheng 已提交
809 810
    }
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
811

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
812 813
  pConn->outType = 0;
  pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
814
  pConn->reqMsgLen = 0;
815 816

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
817 818
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
819
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
820 821
  int32_t    sid;
  SRpcConn  *pConn = NULL;
H
hzcheng 已提交
822

823
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
824

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
825
  sid = htonl(pHead->destId);
H
hzcheng 已提交
826

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
827 828
  if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) {
    tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
829
    terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE; return NULL;
H
hzcheng 已提交
830 831
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
832 833
  if (sid < 0 || sid >= pRpc->sessions) {
    tTrace("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
834
           pRpc->sessions, taosMsg[pHead->msgType]);
835
    terrno = TSDB_CODE_RPC_INVALID_SESSION_ID; return NULL;
H
hzcheng 已提交
836 837
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
838
  pConn = rpcGetConnObj(pRpc, sid, pRecv);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
839
  if (pConn == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
840
    tTrace("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno)); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
841
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
842
  } 
H
hzcheng 已提交
843

844
  rpcLockConn(pConn);
H
hzcheng 已提交
845

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
846 847
  if (rpcIsReq(pHead->msgType)) {
    pConn->ahandle = (void *)pHead->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
848
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
849 850 851
  }

  sid = pConn->sid;
852
  pConn->chandle = pRecv->chandle;
J
jtao1735 已提交
853
  pConn->peerIp = pRecv->ip; 
854
  pConn->peerPort = pRecv->port;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
855
  if (pHead->port) pConn->peerPort = htons(pHead->port); 
H
hzcheng 已提交
856

857
  terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
858 859 860 861

  // code can be transformed only after authentication
  pHead->code = htonl(pHead->code);

862
  if (terrno == 0) {
J
jtao1735 已提交
863
    if (pHead->encrypt) {
864 865
      // decrypt here
    }
H
hzcheng 已提交
866

867 868 869
    if ( rpcIsReq(pHead->msgType) ) {
      terrno = rpcProcessReqHead(pConn, pHead);
      pConn->connType = pRecv->connType;
870

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
871 872
      // client shall send the request within tsRpcTime again, double it 
      taosTmrReset(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
873 874 875
    } else {
      terrno = rpcProcessRspHead(pConn, pHead);
    }
H
hzcheng 已提交
876 877
  }

878
  rpcUnlockConn(pConn);
H
hzcheng 已提交
879

880
  return pConn;
H
hzcheng 已提交
881 882
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
883 884 885 886 887 888 889 890 891
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
  SRpcInfo *pRpc = pConn->pRpc;

  // if there are pending request, notify the app
  tTrace("%s, notify the server app, connection is gone", pConn->info);

  SRpcMsg rpcMsg;
  rpcMsg.pCont = pConn->pReqMsg;     // pReqMsg is re-used to store the APP context from server
  rpcMsg.contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length
892
  rpcMsg.ahandle = pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
893 894 895 896 897 898
  rpcMsg.handle = pConn;
  rpcMsg.msgType = pConn->inType;
  rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; 
  if (pRpc->cfp) (*(pRpc->cfp))(&rpcMsg, NULL);
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
899
static void rpcProcessBrokenLink(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
900
  if (pConn == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
901
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
902
  tTrace("%s, link is broken", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
903 904

  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
905

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
906 907
  if (pConn->outType) {
    SRpcReqContext *pContext = pConn->pContext;
908
    pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
909 910
    taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
  }
911

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
912
  if (pConn->inType) rpcReportBrokenLinkToServer(pConn); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
913

914
  rpcReleaseConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
915
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
916 917
}

918 919 920 921
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
  SRpcHead  *pHead = (SRpcHead *)pRecv->msg;
  SRpcInfo  *pRpc = (SRpcInfo *)pRecv->shandle;
  SRpcConn  *pConn = (SRpcConn *)pRecv->thandle;
H
hzcheng 已提交
922

923 924 925
  tDump(pRecv->msg, pRecv->msgLen);

  // underlying UDP layer does not know it is server or client
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
926
  pRecv->connType = pRecv->connType | pRpc->connType;  
H
hzcheng 已提交
927

B
Bomin Zhang 已提交
928
  if (pRecv->msg == NULL) {
929
    rpcProcessBrokenLink(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
930 931 932
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
933 934
  terrno = 0;
  pConn = rpcProcessMsgHead(pRpc, pRecv);
H
hzcheng 已提交
935

S
slguan 已提交
936
  if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
937 938
    tTrace("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x",
        pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, 
939
        pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
H
hzcheng 已提交
940 941
  }

H
TD-34  
hzcheng 已提交
942
  int32_t code = terrno;
943
  if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) {
H
TD-34  
hzcheng 已提交
944
    if (code != 0) { // parsing error
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
945
      if (rpcIsReq(pHead->msgType)) {
H
TD-34  
hzcheng 已提交
946
        rpcSendErrorMsgToPeer(pRecv, code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
947
        tTrace("%s %p %p, %s is sent with error code:%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
948
      } 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
949
    } else { // msg is passed to app only parsing is ok 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
950
      rpcProcessIncomingMsg(pConn, pHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
951
    }
H
hzcheng 已提交
952 953
  }

954
  if (code) rpcFreeMsg(pRecv->msg); // parsing failed, msg shall be freed
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
955 956
  return pConn;
}
H
hzcheng 已提交
957

958 959 960
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
  SRpcInfo       *pRpc = pContext->pRpc;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
961
  pContext->pConn = NULL;
962 963 964 965
  if (pContext->pRsp) { 
    // for synchronous API
    memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet));
    memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
966
    tsem_post(pContext->pSem);
967 968
  } else {
    // for asynchronous API 
J
jtao1735 已提交
969 970 971
    SRpcIpSet *pIpSet = NULL;
    if (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) 
      pIpSet = &pContext->ipSet;  
972

J
jtao1735 已提交
973
    (*pRpc->cfp)(pMsg, pIpSet);  
974 975 976 977 978 979
  }

  // free the request message
  rpcFreeCont(pContext->pCont); 
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
980
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
S
slguan 已提交
981

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
982
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
983
  SRpcMsg   rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
984

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
985
  pHead = rpcDecompressRpcMsg(pHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
986 987 988 989
  rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
  rpcMsg.pCont = pHead->content;
  rpcMsg.msgType = pHead->msgType;
  rpcMsg.code = pHead->code; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
990
  rpcMsg.ahandle = pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
991
   
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
992
  if ( rpcIsReq(pHead->msgType) ) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
993
    rpcMsg.handle = pConn;
994
    rpcAddRef(pRpc);  // add the refCount for requests
995
    pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
J
jtao1735 已提交
996
    (*(pRpc->cfp))(&rpcMsg, NULL);
H
hzcheng 已提交
997
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
998 999
    // it's a response
    SRpcReqContext *pContext = pConn->pContext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1000
    rpcMsg.handle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1001
    pConn->pContext = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1002

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1003
    // for UDP, port may be changed by server, the port in ipSet shall be used for cache
1004
    if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
1005 1006 1007 1008
      rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->ipSet.port[pContext->ipSet.inUse], pConn->connType);    
    } else {
      rpcCloseConn(pConn);
    }
H
hzcheng 已提交
1009

1010
    if (pHead->code == TSDB_CODE_RPC_REDIRECT) { 
guanshengliang's avatar
guanshengliang 已提交
1011 1012
      pContext->redirect++;
      if (pContext->redirect > TSDB_MAX_REPLICA) {
1013
        pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1014
        tWarn("%s, too many redirects, quit", pConn->info);
guanshengliang's avatar
guanshengliang 已提交
1015 1016 1017
      }
    }

1018
    if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1019
      pContext->numOfTry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1020
      memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1021
      tTrace("%s, redirect is received, numOfIps:%d", pConn->info, pContext->ipSet.numOfIps);
1022 1023
      for (int i=0; i<pContext->ipSet.numOfIps; ++i) 
        pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1024
      rpcSendReqToServer(pRpc, pContext);
1025
    } else if (pHead->code == TSDB_CODE_RPC_NOT_READY) {
1026 1027
      pContext->code = pHead->code;
      rpcProcessConnError(pContext, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1028
    } else {
1029
      rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1030 1031 1032 1033
    }
  }
}

1034
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1035 1036
  char       msg[RPC_MSG_OVERHEAD];
  SRpcHead  *pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1037

1038
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1039 1040 1041 1042
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
  pHead->msgType = pConn->inType+1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1043
  pHead->spi = pConn->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1044 1045 1046 1047
  pHead->encrypt = 0;
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
1048
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1049
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1050
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1051
  pHead->code = htonl(code);
H
hzcheng 已提交
1052

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071
  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
  pConn->secured = 1; // connection shall be secured
}

static void rpcSendReqHead(SRpcConn *pConn) {
  char       msg[RPC_MSG_OVERHEAD];
  SRpcHead  *pHead;

  // set msg header
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
  pHead->msgType = pConn->outType;
  pHead->spi = pConn->spi;
  pHead->encrypt = 0;
  pHead->tranId = pConn->outTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1072
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1073 1074 1075 1076
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
  pHead->code = 1;

  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1077
}
H
hjxilinx 已提交
1078

1079
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1080 1081
  SRpcHead  *pRecvHead, *pReplyHead;
  char       msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) ];
1082 1083
  uint32_t   timeStamp;
  int        msgLen;
H
hzcheng 已提交
1084

1085
  pRecvHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1086
  pReplyHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1087

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1088 1089 1090 1091
  memset(msg, 0, sizeof(SRpcHead));
  pReplyHead->version = pRecvHead->version;
  pReplyHead->msgType = (char)(pRecvHead->msgType + 1);
  pReplyHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1092
  pReplyHead->encrypt = pRecvHead->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1093
  pReplyHead->tranId = pRecvHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1094
  pReplyHead->sourceId = pRecvHead->destId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1095
  pReplyHead->destId = pRecvHead->sourceId;
1096
  pReplyHead->linkUid = pRecvHead->linkUid;
1097
  pReplyHead->ahandle = pRecvHead->ahandle;
H
hzcheng 已提交
1098

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1099 1100
  pReplyHead->code = htonl(code);
  msgLen = sizeof(SRpcHead);
H
hzcheng 已提交
1101

1102
  if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP) {
1103
    // include a time stamp if client's time is not synchronized well
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1104
    uint8_t *pContent = pReplyHead->content;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1105
    timeStamp = htonl(taosGetTimestampSec());
1106 1107
    memcpy(pContent, &timeStamp, sizeof(timeStamp));
    msgLen += sizeof(timeStamp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1108
  }
H
hzcheng 已提交
1109

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1110
  pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1111
  (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle);
H
hzcheng 已提交
1112

1113
  return; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1114 1115
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1116
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1117 1118 1119 1120
  SRpcHead  *pHead = rpcHeadFromCont(pContext->pCont);
  char      *msg = (char *)pHead;
  int        msgLen = rpcMsgLenFromCont(pContext->contLen);
  char       msgType = pContext->msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1121

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1122
  pContext->numOfTry++;
1123
  SRpcConn *pConn = rpcSetupConnToServer(pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1124 1125 1126 1127 1128 1129
  if (pConn == NULL) {
    pContext->code = terrno;
    taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
    return;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1130
  pContext->pConn = pConn;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1131
  pConn->ahandle = pContext->ahandle;
1132
  rpcLockConn(pConn);
1133

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1134
  // set the message header  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1135 1136 1137
  pHead->version = 1;
  pHead->msgType = msgType;
  pHead->encrypt = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1138 1139
  pConn->tranId++;
  if ( pConn->tranId == 0 ) pConn->tranId++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1140 1141 1142 1143
  pHead->tranId = pConn->tranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->port = 0;
1144
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1145
  pHead->ahandle = (uint64_t)pConn->ahandle;
1146
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1147 1148 1149

  // set the connection parameters
  pConn->outType = msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1150
  pConn->outTranId = pHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1151 1152
  pConn->pReqMsg = msg;
  pConn->reqMsgLen = msgLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1153
  pConn->pContext = pContext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1154

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1155
  rpcSendMsgToPeer(pConn, msg, msgLen);
1156
  taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1157 1158

  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1159
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1160 1161

static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1162 1163
  int        writtenLen = 0;
  SRpcHead  *pHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1164

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1165
  msgLen = rpcAddAuthPart(pConn, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1166

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1167
  if ( rpcIsReq(pHead->msgType)) {
S
slguan 已提交
1168
    if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1169 1170 1171
      tTrace("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d",
             pConn->info, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort, 
             msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1172
  } else {
J
jtao1735 已提交
1173
    if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured
S
slguan 已提交
1174
    if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1175 1176
      tTrace("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
          pConn->info, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort, 
J
Jeff Tao 已提交
1177
          htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1178
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1179

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1180
  //tTrace("connection type is: %d", pConn->connType);
1181
  writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1182

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1183
  if (writtenLen != msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1184
    tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
1185 1186
  }
 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1187
  tDump(msg, msgLen);
H
hzcheng 已提交
1188 1189
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1190
static void rpcProcessConnError(void *param, void *id) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1191
  SRpcReqContext *pContext = (SRpcReqContext *)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1192 1193
  SRpcInfo       *pRpc = pContext->pRpc;
  SRpcMsg         rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1194
 
H
hjxilinx 已提交
1195 1196 1197 1198
  if (pRpc == NULL) {
    return;
  }
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1199
  tTrace("%s %p, connection error happens", pRpc->label, pContext->ahandle);
H
hzcheng 已提交
1200

J
Jeff Tao 已提交
1201
  if (pContext->numOfTry >= pContext->ipSet.numOfIps) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1202 1203 1204 1205 1206
    rpcMsg.msgType = pContext->msgType+1;
    rpcMsg.handle = pContext->ahandle;
    rpcMsg.code = pContext->code;
    rpcMsg.pCont = NULL;
    rpcMsg.contLen = 0;
1207 1208

    rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1209
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1210
    // move to next IP 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1211 1212
    pContext->ipSet.inUse++;
    pContext->ipSet.inUse = pContext->ipSet.inUse % pContext->ipSet.numOfIps;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1213
    rpcSendReqToServer(pRpc, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1214
  }
H
hzcheng 已提交
1215 1216
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1217 1218 1219
static void rpcProcessRetryTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;
H
hzcheng 已提交
1220

1221
  rpcLockConn(pConn);
H
hzcheng 已提交
1222

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1223
  if (pConn->outType && pConn->user[0]) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1224
    tTrace("%s, expected %s is not received", pConn->info, taosMsg[(int)pConn->outType + 1]);
H
hzcheng 已提交
1225 1226 1227
    pConn->pTimer = NULL;
    pConn->retry++;

S
slguan 已提交
1228
    if (pConn->retry < 4) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1229
      tTrace("%s, re-send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1230
      rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);      
1231
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
H
hzcheng 已提交
1232 1233
    } else {
      // close the connection
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1234
      tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
1235 1236
      if (pConn->pContext) {
        pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1237
        taosTmrStart(rpcProcessConnError, 0, pConn->pContext, pRpc->tmrCtrl);
1238 1239
        rpcReleaseConn(pConn);
      }
H
hzcheng 已提交
1240
    }
1241
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1242
    tTrace("%s, retry timer not processed", pConn->info);
H
hzcheng 已提交
1243 1244
  }

1245
  rpcUnlockConn(pConn);
H
hzcheng 已提交
1246 1247
}

1248 1249 1250
static void rpcProcessIdleTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;

1251 1252
  rpcLockConn(pConn);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1253
  if (pConn->user[0]) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1254
    tTrace("%s, close the connection since no activity", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1255
    if (pConn->inType) rpcReportBrokenLinkToServer(pConn); 
1256
    rpcReleaseConn(pConn);
1257
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1258
    tTrace("%s, idle timer:%p not processed", pConn->info, tmrId);
1259
  }
1260 1261

  rpcUnlockConn(pConn);
1262 1263 1264 1265 1266 1267
}

static void rpcProcessProgressTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;

1268
  rpcLockConn(pConn);
1269

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1270
  if (pConn->inType && pConn->user[0]) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1271
    tTrace("%s, progress timer expired, send progress", pConn->info);
1272
    rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
1273
    pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
1274
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1275
    tTrace("%s, progress timer:%p not processed", pConn->info, tmrId);
1276 1277
  }

1278
  rpcUnlockConn(pConn);
1279 1280 1281
}

static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1282 1283 1284
  SRpcHead  *pHead = rpcHeadFromCont(pCont);
  int32_t    finalLen = 0;
  int        overhead = sizeof(SRpcComp);
1285 1286 1287 1288 1289
  
  if (!NEEDTO_COMPRESSS_MSG(contLen)) {
    return contLen;
  }
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1290
  char *buf = malloc (contLen + overhead + 8);  // 8 extra bytes
1291
  if (buf == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1292
    tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302
    return contLen;
  }
  
  int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
  
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
  if (compLen < contLen - overhead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1303 1304 1305
    SRpcComp *pComp = (SRpcComp *)pCont;
    pComp->reserved = 0; 
    pComp->contLen = htonl(contLen); 
1306 1307
    memcpy(pCont + overhead, buf, compLen);
    
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1308
    pHead->comp = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1309
    //tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen);
1310 1311 1312 1313 1314 1315 1316 1317 1318
    finalLen = compLen + overhead;
  } else {
    finalLen = contLen;
  }

  free(buf);
  return finalLen;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1319
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1320
  int overhead = sizeof(SRpcComp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1321 1322 1323
  SRpcHead   *pNewHead = NULL;  
  uint8_t    *pCont = pHead->content;
  SRpcComp   *pComp = (SRpcComp *)pHead->content;
1324

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1325
  if (pHead->comp) {
1326
    // decompress the content
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1327 1328
    assert(pComp->reserved == 0);
    int contLen = htonl(pComp->contLen);
1329 1330
  
    // prepare the temporary buffer to decompress message
1331 1332
    char *temp = (char *)malloc(contLen + RPC_MSG_OVERHEAD);
    pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext
1333
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1334
    if (pNewHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1335
      int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1336 1337
      int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char *)pNewHead->content, compLen, contLen);
      assert(origLen == contLen);
1338
    
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1339
      memcpy(pNewHead, pHead, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1340
      pNewHead->msgLen = rpcMsgLenFromCont(origLen);
1341
      rpcFreeMsg(pHead); // free the compressed message buffer
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1342
      pHead = pNewHead; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1343
      //tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen);
1344
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1345
      tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
1346 1347 1348
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1349
  return pHead;
1350 1351
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1352
static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) {
H
hzcheng 已提交
1353 1354 1355 1356
  MD5_CTX context;
  int     ret = -1;

  MD5Init(&context);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1357 1358 1359
  MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
  MD5Update(&context, (uint8_t *)pMsg, msgLen);
  MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
H
hzcheng 已提交
1360 1361 1362 1363 1364 1365 1366
  MD5Final(&context);

  if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;

  return ret;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1367
static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) {
H
hzcheng 已提交
1368 1369 1370
  MD5_CTX context;

  MD5Init(&context);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1371
  MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
H
hzcheng 已提交
1372
  MD5Update(&context, (uint8_t *)pMsg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1373
  MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
H
hzcheng 已提交
1374 1375 1376 1377
  MD5Final(&context);

  memcpy(pAuth, context.digest, sizeof(context.digest));
}
1378 1379

static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1380
  SRpcHead *pHead = (SRpcHead *)msg;
1381

1382
  if (pConn->spi && pConn->secured == 0) {
1383
    // add auth part
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1384
    pHead->spi = pConn->spi;
1385 1386 1387
    SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1388
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1389
    rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
1390
  } else {
1391
    pHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1392
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1393 1394 1395 1396 1397 1398
  }

  return msgLen;
}

static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1399
  SRpcHead *pHead = (SRpcHead *)msg;
1400
  int       code = 0;
1401

1402 1403
  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){
    // secured link, or no authentication 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1404
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1405
    // tTrace("%s, secured link, no auth is required", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1406 1407 1408 1409 1410 1411
    return 0;
  }

  if ( !rpcIsReq(pHead->msgType) ) {
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
1412 1413
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
        code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_INVALID_USER || code == TSDB_CODE_RPC_NOT_READY) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1414
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1415
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1416
      return 0;
1417
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1418 1419 1420
  }
 
  code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1421
  if (pHead->spi == pConn->spi) {
1422
    // authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1423
    SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest));
1424 1425 1426 1427 1428

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1429
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
1430
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
1431
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1432
      if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
1433
        tTrace("%s, authentication failed, msg discarded", pConn->info);
1434
        code = TSDB_CODE_RPC_AUTH_FAILURE;
1435
      } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1436
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
1437
        if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1;  // link is secured for client
1438
        // tTrace("%s, message is authenticated", pConn->info);
1439 1440 1441
      }
    }
  } else {
1442
    tTrace("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
1443
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
1444 1445 1446 1447 1448
  }

  return code;
}

1449 1450
static void rpcLockConn(SRpcConn *pConn) {
  int64_t tid = taosGetPthreadId();
1451
  int     i = 0;
1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464
  while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) {
    if (++i % 1000 == 0) {
      sched_yield();
    }
  }
}

static void rpcUnlockConn(SRpcConn *pConn) {
  int64_t tid = taosGetPthreadId();
  if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) {
    assert(false);
  }
}
1465

1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485
static void rpcAddRef(SRpcInfo *pRpc)
{  
   atomic_add_fetch_8(&pRpc->refCount, 1);
}

static void rpcDecRef(SRpcInfo *pRpc)
{ 
  if (atomic_sub_fetch_8(&pRpc->refCount, 1) == 0) {
    taosHashCleanup(pRpc->hash);
    taosTmrCleanUp(pRpc->tmrCtrl);
    taosIdPoolCleanUp(pRpc->idPool);
    rpcCloseConnCache(pRpc->pCache);

    tfree(pRpc->connList);
    pthread_mutex_destroy(&pRpc->mutex);
    tTrace("%s rpc resources are released", pRpc->label);
    tfree(pRpc);
  }
}