rpcMain.c 50.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20 21 22
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
H
hjxilinx 已提交
23
#include "lz4.h"
S
slguan 已提交
24
#include "taoserror.h"
25
#include "tsocket.h"
S
slguan 已提交
26
#include "tglobal.h"
27
#include "taosmsg.h"
S
slguan 已提交
28 29 30
#include "trpc.h"
#include "hash.h"
#include "rpcLog.h"
31 32
#include "rpcUdp.h"
#include "rpcCache.h"
J
Jeff Tao 已提交
33
#include "rpcTcp.h"
34
#include "rpcHead.h"
H
hzcheng 已提交
35

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
36 37 38 39 40
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) 
#define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
41
#define rpcIsReq(type) (type & 1U)
H
hzcheng 已提交
42 43

typedef struct {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
44 45 46
  int      sessions;     // number of sessions allowed
  int      numOfThreads; // number of threads to process incoming messages
  int      idleTime;     // milliseconds;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47
  uint16_t localPort;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48 49
  int8_t   connType;
  int      index;        // for UDP server only, round robin for multiple threads
50
  char     label[TSDB_LABEL_LEN];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
51

52 53 54
  char     user[TSDB_UNI_LEN];   // meter ID
  char     spi;                  // security parameter index
  char     encrypt;              // encrypt algorithm
55 56
  char     secret[TSDB_KEY_LEN]; // secret for the link
  char     ckey[TSDB_KEY_LEN];   // ciphering key 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57

58
  void   (*cfp)(SRpcMsg *, SRpcEpSet *);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
59
  int    (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
60

61
  int32_t   refCount;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
62 63
  void     *idPool;   // handle to ID pool
  void     *tmrCtrl;  // handle to timer
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
64
  SHashObj *hash;     // handle returned by hash utility
65 66
  void     *tcphandle;// returned handle from TCP initialization
  void     *udphandle;// returned handle from UDP initialization
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67
  void     *pCache;   // connection cache
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68
  pthread_mutex_t  mutex;
J
Jeff Tao 已提交
69
  struct SRpcConn *connList;  // connection list
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
70 71 72
} SRpcInfo;

typedef struct {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
73
  SRpcInfo *pRpc;       // associated SRpcInfo
74
  SRpcEpSet epSet;      // ip list provided by app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75
  void     *ahandle;    // handle provided by app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76
  void     *signature;  // for validation
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
77
  struct SRpcConn *pConn; // pConn allocated
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
78
  char      msgType;    // message type
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
79 80
  uint8_t  *pCont;      // content provided by app
  int32_t   contLen;    // content length
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
81
  int32_t   code;       // error code
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
82
  int16_t   numOfTry;   // number of try for different servers
83
  int8_t    oldInUse;   // server EP inUse passed by app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
84
  int8_t    redirect;   // flag to indicate redirect
85
  int8_t    connType;   // connection type
86 87
  SRpcMsg  *pRsp;       // for synchronous API
  tsem_t   *pSem;       // for synchronous API
88
  SRpcEpSet *pSet;      // for synchronous API 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
89
  char      msg[0];     // RpcHead starts from here
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
90 91
} SRpcReqContext;

J
Jeff Tao 已提交
92
typedef struct SRpcConn {
93
  char      info[48];// debug info: label + pConn + ahandle
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
94 95 96
  int       sid;     // session ID
  uint32_t  ownId;   // own link ID
  uint32_t  peerId;  // peer link ID
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97
  char      user[TSDB_UNI_LEN]; // user ID for the link
98 99
  char      spi;     // security parameter index
  char      encrypt; // encryption, 0:1 
100 101
  char      secret[TSDB_KEY_LEN]; // secret for the link
  char      ckey[TSDB_KEY_LEN];   // ciphering key 
102
  char      secured;              // if set to 1, no authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
103
  uint16_t  localPort;      // for UDP only
104
  uint32_t  linkUid;        // connection unique ID assigned by client
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105 106
  uint32_t  peerIp;         // peer IP
  uint16_t  peerPort;       // peer port
J
jtao1735 已提交
107
  char      peerFqdn[TSDB_FQDN_LEN]; // peer FQDN or ip string
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108 109
  uint16_t  tranId;         // outgoing transcation ID, for build message
  uint16_t  outTranId;      // outgoing transcation ID
110 111
  uint16_t  inTranId;       // transcation ID for incoming msg
  uint8_t   outType;        // message type for outgoing request
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
112
  uint8_t   inType;         // message type for incoming request  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113 114
  void     *chandle;  // handle passed by TCP/UDP connection layer
  void     *ahandle;  // handle provided by upper app layter
115
  int       retry;    // number of retry for sending request
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
116
  int       tretry;   // total retry
117 118 119 120 121 122 123
  void     *pTimer;   // retry timer to monitor the response
  void     *pIdleTimer; // idle timer
  char     *pRspMsg;    // response message including header
  int       rspMsgLen;  // response messag length
  char     *pReqMsg;    // request message including header
  int       reqMsgLen;  // request message length
  SRpcInfo *pRpc;       // the associated SRpcInfo
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
124
  int8_t    connType;   // connection type
125
  int64_t   lockedBy;   // lock for connection
126
  SRpcReqContext *pContext; // request context
H
hzcheng 已提交
127 128
} SRpcConn;

S
slguan 已提交
129
int tsRpcMaxUdpSize = 15000;  // bytes
130
int tsProgressTimer = 100;
H
hzcheng 已提交
131 132 133
// not configurable
int tsRpcMaxRetry;
int tsRpcHeadSize;
134
int tsRpcOverhead;
H
hzcheng 已提交
135

136 137 138 139 140 141 142
// server:0 client:1  tcp:2 udp:0
#define RPC_CONN_UDPS   0
#define RPC_CONN_UDPC   1
#define RPC_CONN_TCPS   2
#define RPC_CONN_TCPC   3
#define RPC_CONN_TCP    2

J
jtao1735 已提交
143
void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
144 145
    taosInitUdpConnection,
    taosInitUdpConnection,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
146 147 148
    taosInitTcpServer, 
    taosInitTcpClient
};
H
hzcheng 已提交
149

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
150
void (*taosCleanUpConn[])(void *thandle) = {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151 152
    taosCleanUpUdpConnection, 
    taosCleanUpUdpConnection, 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
153 154 155
    taosCleanUpTcpServer,
    taosCleanUpTcpClient
};
H
hzcheng 已提交
156

157
void (*taosStopConn[])(void *thandle) = {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
158 159
    taosStopUdpConnection, 
    taosStopUdpConnection, 
160
    taosStopTcpServer,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161
    taosStopTcpClient,
162 163
};

164
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
165 166
    taosSendUdpData, 
    taosSendUdpData, 
J
Jeff Tao 已提交
167 168
    taosSendTcpData, 
    taosSendTcpData
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
169
};
H
hzcheng 已提交
170

J
jtao1735 已提交
171
void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = {
S
slguan 已提交
172 173 174 175
    taosOpenUdpConnection,
    taosOpenUdpConnection,
    NULL,
    taosOpenTcpClientConnection,
H
hzcheng 已提交
176 177
};

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178 179 180
void (*taosCloseConn[])(void *chandle) = {
    NULL, 
    NULL, 
J
Jeff Tao 已提交
181 182
    taosCloseTcpConnection, 
    taosCloseTcpConnection
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
183 184
};

J
jtao1735 已提交
185
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
186
static void      rpcCloseConn(void *thandle);
187
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext);
188
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189 190
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
H
hzcheng 已提交
191

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
192
static void  rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
193
static void  rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
194
static void  rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
195
static void  rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
196
static void  rpcSendReqHead(SRpcConn *pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
197

198
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
199
static void  rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead);
200 201 202
static void  rpcProcessConnError(void *param, void *id);
static void  rpcProcessRetryTimer(void *, void *);
static void  rpcProcessIdleTimer(void *param, void *tmrId);
203
static void  rpcProcessProgressTimer(void *param, void *tmrId);
H
hzcheng 已提交
204

205
static void  rpcFreeMsg(void *msg);
206
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
207
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
208 209
static int   rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
static int   rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
210 211
static void  rpcLockConn(SRpcConn *pConn);
static void  rpcUnlockConn(SRpcConn *pConn);
212 213
static void  rpcAddRef(SRpcInfo *pRpc);
static void  rpcDecRef(SRpcInfo *pRpc);
H
hzcheng 已提交
214

J
Jeff Tao 已提交
215
void *rpcOpen(const SRpcInit *pInit) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
216
  SRpcInfo *pRpc;
H
hzcheng 已提交
217

218 219
  tsProgressTimer = tsRpcTimer/2; 
  tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
220
  tsRpcHeadSize = RPC_MSG_OVERHEAD; 
221
  tsRpcOverhead = sizeof(SRpcReqContext);
H
hzcheng 已提交
222

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
223 224
  pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
  if (pRpc == NULL) return NULL;
H
hzcheng 已提交
225

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
226
  if(pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
227
  pRpc->connType = pInit->connType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
228
  pRpc->idleTime = pInit->idleTime;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229
  pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
230 231
  pRpc->localPort = pInit->localPort;
  pRpc->afp = pInit->afp;
J
jtao1735 已提交
232
  pRpc->sessions = pInit->sessions+1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
233 234 235
  if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
  if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret));
  if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236
  pRpc->spi = pInit->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237
  pRpc->cfp = pInit->cfp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
238
  pRpc->afp = pInit->afp;
239
  pRpc->refCount = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
240

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
241
  size_t size = sizeof(SRpcConn) * pRpc->sessions;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
242 243
  pRpc->connList = (SRpcConn *)calloc(1, size);
  if (pRpc->connList == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244
    tError("%s failed to allocate memory for taos connections, size:%ld", pRpc->label, size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
245
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
246 247
    return NULL;
  }
H
hzcheng 已提交
248

J
jtao1735 已提交
249
  pRpc->idPool = taosInitIdPool(pRpc->sessions-1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250 251
  if (pRpc->idPool == NULL) {
    tError("%s failed to init ID pool", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
253
    return NULL;
H
hzcheng 已提交
254
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
255

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
256
  pRpc->tmrCtrl = taosTmrInit(pRpc->sessions*2 + 1, 50, 10000, pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
257 258
  if (pRpc->tmrCtrl == NULL) {
    tError("%s failed to init timers", pRpc->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
259
    rpcClose(pRpc);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
260 261
    return NULL;
  }
H
hzcheng 已提交
262

263
  if (pRpc->connType == TAOS_CONN_SERVER) {
J
jtao1735 已提交
264
    pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
265 266 267 268 269 270
    if (pRpc->hash == NULL) {
      tError("%s failed to init string hash", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
  } else {
271
    pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime); 
272 273 274 275 276
    if ( pRpc->pCache == NULL ) {
      tError("%s failed to init connection cache", pRpc->label);
      rpcClose(pRpc);
      return NULL;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
277
  }
H
hzcheng 已提交
278

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
279 280
  pthread_mutex_init(&pRpc->mutex, NULL);

281 282 283 284 285 286 287 288 289 290 291
  pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, 
                                                  pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
  pRpc->udphandle = (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, 
                                                  pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);

  if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) {
    tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort);
    rpcClose(pRpc);
    return NULL;
  }

292
  tDebug("%s rpc is openned, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions);
H
hzcheng 已提交
293

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
294
  return pRpc;
H
hzcheng 已提交
295 296
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
297 298
void rpcClose(void *param) {
  SRpcInfo *pRpc = (SRpcInfo *)param;
H
hzcheng 已提交
299

300
  // stop connection to outside first
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
301 302
  (*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
  (*taosStopConn[pRpc->connType])(pRpc->udphandle);
303 304

  // close all connections 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
305
  for (int i = 0; i < pRpc->sessions; ++i) {
306
    if (pRpc->connList && pRpc->connList[i].user[0]) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
307
      rpcCloseConn((void *)(pRpc->connList + i));
H
hzcheng 已提交
308 309 310
    }
  }

311
  // clean up
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
312 313
  (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
  (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
J
Jeff Tao 已提交
314

315
  tDebug("%s rpc is closed", pRpc->label);
316
  rpcDecRef(pRpc);
H
hzcheng 已提交
317 318
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
319 320
void *rpcMallocCont(int contLen) {
  int size = contLen + RPC_MSG_OVERHEAD;
321

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
322 323
  char *start = (char *)calloc(1, (size_t)size);
  if (start == NULL) {
324 325 326 327
    tError("failed to malloc msg, size:%d", size);
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
328
  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
329 330 331
}

void rpcFreeCont(void *cont) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
332
  if ( cont ) {
333 334
    char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext);
    free(temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
335
    // tTrace("free mem: %p", temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
336
  }
337 338
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
339 340 341 342 343 344
void *rpcReallocCont(void *ptr, int contLen) {
  if (ptr == NULL) return rpcMallocCont(contLen);

  char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead);
  if (contLen == 0 ) {
    free(start); 
J
Jeff Tao 已提交
345
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
346 347 348 349 350 351 352 353 354 355 356 357
  }

  int size = contLen + RPC_MSG_OVERHEAD;
  start = realloc(start, size);
  if (start == NULL) {
    tError("failed to realloc cont, size:%d", size);
    return NULL;
  } 

  return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}

358
void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
359 360 361
  SRpcInfo       *pRpc = (SRpcInfo *)shandle;
  SRpcReqContext *pContext;

J
Jeff Tao 已提交
362
  int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
363
  pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
364
  pContext->ahandle = pMsg->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
365
  pContext->signature = pContext;
366
  pContext->pRpc = (SRpcInfo *)shandle;
367
  pContext->epSet = *pEpSet;
J
Jeff Tao 已提交
368
  pContext->contLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
369 370
  pContext->pCont = pMsg->pCont;
  pContext->msgType = pMsg->msgType;
371
  pContext->oldInUse = pEpSet->inUse;
372

373
  pContext->connType = RPC_CONN_UDPC; 
J
Jeff Tao 已提交
374
  if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC;
375 376 377

  // connection type is application specific. 
  // for TDengine, all the query, show commands shall have TCP connection
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
378
  char type = pMsg->msgType;
B
Bomin Zhang 已提交
379 380 381 382
  if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE
    || type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP
    || type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_TABLE_META
    || type == TSDB_MSG_TYPE_CM_SHOW )
383 384
    pContext->connType = RPC_CONN_TCPC;
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
385
  // set the handle to pContext, so app can cancel the request
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
386
  if (pMsg->handle) *((void **)pMsg->handle) = pContext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
387

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
388
  rpcSendReqToServer(pRpc, pContext);
389

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
390
  return;
391 392
}

J
Jeff Tao 已提交
393
void rpcSendResponse(const SRpcMsg *pRsp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
394
  int        msgLen = 0;
J
Jeff Tao 已提交
395 396 397
  SRpcConn  *pConn = (SRpcConn *)pRsp->handle;
  SRpcMsg    rpcMsg = *pRsp;
  SRpcMsg   *pMsg = &rpcMsg;
398
  SRpcInfo  *pRpc = pConn->pRpc;
J
Jeff Tao 已提交
399

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
400 401 402
  if ( pMsg->pCont == NULL ) {
    pMsg->pCont = rpcMallocCont(0);
    pMsg->contLen = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
403 404
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
405
  SRpcHead  *pHead = rpcHeadFromCont(pMsg->pCont);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
406 407
  char      *msg = (char *)pHead;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
408 409
  pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
  msgLen = rpcMsgLenFromCont(pMsg->contLen);
410

411
  rpcLockConn(pConn);
412

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
413
  if ( pConn->inType == 0 || pConn->user[0] == 0 ) {
414
    tDebug("%s, connection is already released, rsp wont be sent", pConn->info);
415
    rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
416 417
    rpcFreeCont(pMsg->pCont);
    rpcDecRef(pRpc);
418 419 420
    return;
  }

421
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
422 423
  pHead->version = 1;
  pHead->msgType = pConn->inType+1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
424 425
  pHead->spi = pConn->spi;
  pHead->encrypt = pConn->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
426 427 428
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
429
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
430
  pHead->port = htons(pConn->localPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
431
  pHead->code = htonl(pMsg->code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
432 433
  pHead->ahandle = (uint64_t) pConn->ahandle;
 
434 435
  // set pConn parameters
  pConn->inType = 0;
436 437

  // response message is released until new response is sent
438
  rpcFreeMsg(pConn->pRspMsg); 
439 440
  pConn->pRspMsg = msg;
  pConn->rspMsgLen = msgLen;
441
  if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--;
442

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
443
  // stop the progress timer
444
  taosTmrStopA(&pConn->pTimer);
445 446 447

  // set the idle timer to monitor the activity
  taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
448
  rpcSendMsgToPeer(pConn, msg, msgLen);
449 450 451 452

  // if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured
  if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY)
    pConn->secured = 1; // connection shall be secured
453 454

  if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
455 456
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
457

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
458
  rpcUnlockConn(pConn);
459
  rpcDecRef(pRpc);    // decrease the referene count
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
460

461 462 463
  return;
}

464
void rpcSendRedirectRsp(void *thandle, const SRpcEpSet *pEpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
465 466
  SRpcMsg  rpcMsg; 
  memset(&rpcMsg, 0, sizeof(rpcMsg));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
467
  
468
  rpcMsg.contLen = sizeof(SRpcEpSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
469 470
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  if (rpcMsg.pCont == NULL) return;
471

472
  memcpy(rpcMsg.pCont, pEpSet, sizeof(SRpcEpSet));
473

474
  rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
475
  rpcMsg.handle = thandle;
476

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
477
  rpcSendResponse(&rpcMsg);
478 479 480 481

  return;
}

482
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
483
  SRpcConn  *pConn = (SRpcConn *)thandle;
484
  if (pConn->user[0] == 0) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
485

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
486 487
  pInfo->clientIp = pConn->peerIp;
  pInfo->clientPort = pConn->peerPort;
J
jtao1735 已提交
488
  // pInfo->serverIp = pConn->destIp;
S
slguan 已提交
489

B
Bomin Zhang 已提交
490
  tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
491
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
492 493
}

494
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
495 496 497 498 499 500 501 502 503
  SRpcReqContext *pContext;
  pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));

  memset(pRsp, 0, sizeof(SRpcMsg));
  
  tsem_t   sem;        
  tsem_init(&sem, 0, 0);
  pContext->pSem = &sem;
  pContext->pRsp = pRsp;
504
  pContext->pSet = pEpSet;
505

506
  rpcSendRequest(shandle, pEpSet, pMsg);
507 508 509 510 511 512 513

  tsem_wait(&sem);
  tsem_destroy(&sem);

  return;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
514
// this API is used by server app to keep an APP context in case connection is broken
515
int rpcReportProgress(void *handle, char *pCont, int contLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
516
  SRpcConn *pConn = (SRpcConn *)handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
517
  int code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
518

519
  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
520

521 522 523 524
  if (pConn->user[0]) {
    // pReqMsg and reqMsgLen is re-used to store the context from app server
    pConn->pReqMsg = pCont;     
    pConn->reqMsgLen = contLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
525
  } else {
526
    tDebug("%s, rpc connection is already released", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
527 528 529
    rpcFreeCont(pCont);
    code = -1;
  }
530

531
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
532
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
533 534
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
535 536 537
void rpcCancelRequest(void *handle) {
  SRpcReqContext *pContext = handle;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
538 539 540 541
  // signature is used to check if pContext is freed. 
  // pContext may have been released just before app calls the rpcCancelRequest 
  if (pContext->signature != pContext) return;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
542
  if (pContext->pConn) {
543
    tDebug("%s, app trys to cancel request", pContext->pConn->info);
544
    pContext->pConn->pReqMsg = NULL;  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
545 546 547 548 549 550
    rpcCloseConn(pContext->pConn);
    pContext->pConn = NULL;
    rpcFreeCont(pContext->pCont);
  }
}

551 552 553 554
static void rpcFreeMsg(void *msg) {
  if ( msg ) {
    char *temp = (char *)msg - sizeof(SRpcReqContext);
    free(temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
555
    // tTrace("free mem: %p", temp);
556 557 558
  }
}

J
jtao1735 已提交
559
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
560
  SRpcConn *pConn;
S
slguan 已提交
561

J
jtao1735 已提交
562
  uint32_t peerIp = taosGetIpFromFqdn(peerFqdn);
563
  if (peerIp == 0xFFFFFFFF) {
J
jtao1735 已提交
564
    tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); 
565
    terrno = TSDB_CODE_RPC_APP_ERROR; 
J
jtao1735 已提交
566 567 568
    return NULL;
  }

569
  pConn = rpcAllocateClientConn(pRpc); 
H
hzcheng 已提交
570

571
  if (pConn) { 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
572
    tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn));
J
jtao1735 已提交
573
    pConn->peerIp = peerIp;
574
    pConn->peerPort = peerPort;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
575
    tstrncpy(pConn->user, pRpc->user, sizeof(pConn->user));
576
    pConn->connType = connType;
577

578 579
    if (taosOpenConn[connType]) {
      void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle;
J
jtao1735 已提交
580
      pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
581
      if (pConn->chandle == NULL) {
582
        terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
583 584 585
        rpcCloseConn(pConn);
        pConn = NULL;
      }
H
hzcheng 已提交
586 587 588
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
589
  return pConn;
H
hzcheng 已提交
590 591
}

592
static void rpcReleaseConn(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
593
  SRpcInfo *pRpc = pConn->pRpc;
594
  if (pConn->user[0] == 0) return;
H
hzcheng 已提交
595

596 597
  pConn->user[0] = 0;
  if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle);
H
hzcheng 已提交
598

599 600 601 602 603
  taosTmrStopA(&pConn->pTimer);
  taosTmrStopA(&pConn->pIdleTimer);

  if ( pRpc->connType == TAOS_CONN_SERVER) {
    char hashstr[40] = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
604
    size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
J
jtao1735 已提交
605
    taosHashRemove(pRpc->hash, hashstr, size);
606
    rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
607
    pConn->pRspMsg = NULL;
608 609 610
  
    // if server has ever reported progress, free content
    if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);  // do not use rpcFreeMsg
611
  } else {
612
    // if there is an outgoing message, free it
613
    if (pConn->outType && pConn->pReqMsg) 
614
      rpcFreeMsg(pConn->pReqMsg);
615
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
616

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
617 618 619 620 621 622
  // memset could not be used, since lockeBy can not be reset
  pConn->inType = 0;
  pConn->outType = 0;
  pConn->inTranId = 0;
  pConn->outTranId = 0;
  pConn->secured = 0;
623
  pConn->peerId = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
624 625
  pConn->peerIp = 0;
  pConn->peerPort = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
626 627 628
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
  pConn->pContext = NULL;
629
  pConn->chandle = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
630 631

  taosFreeId(pRpc->idPool, pConn->sid);
632
  tDebug("%s, rpc connection is released", pConn->info);
633 634 635 636 637 638 639
}

static void rpcCloseConn(void *thandle) {
  SRpcConn *pConn = (SRpcConn *)thandle;

  rpcLockConn(pConn);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
640 641
  if (pConn->user[0])
    rpcReleaseConn(pConn);
642

643
  rpcUnlockConn(pConn);
H
hzcheng 已提交
644 645
}

646 647
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
648

649 650 651
  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
652
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
653
  } else {
654 655 656 657
    pConn = pRpc->connList + sid;

    pConn->pRpc = pRpc;
    pConn->sid = sid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
658
    pConn->tranId = (uint16_t)(random() & 0xFFFF);
659
    pConn->ownId = htonl(pConn->sid);
660
    pConn->linkUid = (uint32_t)((int64_t)pConn + (int64_t)getpid() + (int64_t)pConn->tranId);
661 662 663
    pConn->spi = pRpc->spi;
    pConn->encrypt = pRpc->encrypt;
    if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN);
664
    tDebug("%s %p client connection is allocated, uid:0x%x", pRpc->label, pConn, pConn->linkUid);
665
  }
H
hzcheng 已提交
666

667 668
  return pConn;
}
H
hzcheng 已提交
669

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
670
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
671
  SRpcConn *pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
672
  char      hashstr[40] = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
673 674
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
675
  size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
676
 
677
  // check if it is already allocated
J
jtao1735 已提交
678
  SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
679
  if (ppConn) pConn = *ppConn;
680 681 682 683
  if (pConn) {
    pConn->secured = 0;
    return pConn;
  }
684

685 686 687 688 689 690
  // if code is not 0, it means it is simple reqhead, just ignore
  if (pHead->code != 0) {
    terrno = TSDB_CODE_RPC_ALREADY_PROCESSED;
    return NULL;
  }

691 692 693
  int sid = taosAllocateId(pRpc->idPool);
  if (sid <= 0) {
    tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
694
    terrno = TSDB_CODE_RPC_MAX_SESSIONS;
695 696
  } else {
    pConn = pRpc->connList + sid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
697
    memcpy(pConn->user, pHead->user, tListLen(pConn->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
698
    pConn->pRpc = pRpc;
H
hzcheng 已提交
699 700
    pConn->sid = sid;
    pConn->tranId = (uint16_t)(rand() & 0xFFFF);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
701
    pConn->ownId = htonl(pConn->sid);
702
    pConn->linkUid = pHead->linkUid;
703
    if (pRpc->afp) {
704
      if (pConn->user[0] == 0) {
705
        terrno = TSDB_CODE_RPC_AUTH_REQUIRED;
706 707 708 709
      } else {
        terrno = (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
      }

710 711 712 713
      if (terrno != 0) {
        taosFreeId(pRpc->idPool, sid);   // sid shall be released
        pConn = NULL;
      }
H
hzcheng 已提交
714
    }
715
  }      
H
hzcheng 已提交
716

717
  if (pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
718 719 720 721 722
    if (pRecv->connType == RPC_CONN_UDPS && pRpc->numOfThreads > 1) {
      // UDP server, assign to new connection
      pRpc->index = (pRpc->index+1) % pRpc->numOfThreads;
      pConn->localPort = (pRpc->localPort + pRpc->index);
    }
H
hjxilinx 已提交
723
  
J
jtao1735 已提交
724
    taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
725
    tDebug("%s %p server connection is allocated, uid:0x%x", pRpc->label, pConn, pConn->linkUid);
726 727 728 729 730
  }

  return pConn;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
731
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
732
  SRpcConn *pConn = NULL;  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
733
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
734 735 736

  if (sid) {
    pConn = pRpc->connList + sid;
737
    if (pConn->user[0] == 0) pConn = NULL;
738 739
  } 

740 741 742 743
  if (pConn == NULL) { 
    if (pRpc->connType == TAOS_CONN_SERVER) {
      pConn = rpcAllocateServerConn(pRpc, pRecv);
    } else {
744
      terrno = TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
745 746
    }
  }
747

748
  if (pConn) {
749
    if (pConn->linkUid != pHead->linkUid) {
750
      terrno = TSDB_CODE_RPC_MISMATCHED_LINK_ID;
B
Bomin Zhang 已提交
751
      tError("%s %p %p, linkUid:0x%x is not matched with received:0x%x", pRpc->label, pConn, (void*)pHead->ahandle, pConn->linkUid, pHead->linkUid);
752
      pConn = NULL;
H
hzcheng 已提交
753 754 755
    }
  }

756
  return pConn;
H
hzcheng 已提交
757 758
}

759
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
760 761
  SRpcConn   *pConn;
  SRpcInfo   *pRpc = pContext->pRpc;
762
  SRpcEpSet  *pEpSet = &pContext->epSet;
H
hzcheng 已提交
763

764
  pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
765
  if ( pConn == NULL || pConn->user[0] == 0) {
766
    pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
767 768 769
  } 

  if (pConn) {
S
Shengliang Guan 已提交
770
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
771
    pConn->ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
772
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
773
    pConn->tretry = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
774
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
775
    tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
776
  }
H
hzcheng 已提交
777

778
  return pConn;
H
hzcheng 已提交
779 780
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
781
static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
H
hzcheng 已提交
782

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
783
    if (pConn->peerId == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
784
      pConn->peerId = pHead->sourceId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
785
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
786
      if (pConn->peerId != pHead->sourceId) {
787
        tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
788
               pConn->peerId, pHead->sourceId);
789
        return TSDB_CODE_RPC_INVALID_VALUE;
H
hzcheng 已提交
790 791 792
      }
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
793 794
    if (pConn->inTranId == pHead->tranId) {
      if (pConn->inType == pHead->msgType) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
795
        if (pHead->code == 0) {
796
          tDebug("%s, %s is retransmitted", pConn->info, taosMsg[pHead->msgType]);
797
          rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
798 799 800
        } else {
          // do nothing, it is heart beat from client
        }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
801
      } else if (pConn->inType == 0) {
802
        tDebug("%s, %s is already processed, tranId:%d", pConn->info, taosMsg[pHead->msgType], pConn->inTranId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
803
        rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
804
      } else {
805
        tDebug("%s, mismatched message %s and tranId", pConn->info, taosMsg[pHead->msgType]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
806
      }
H
hzcheng 已提交
807

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
808
      // do not reply any message
809
      return TSDB_CODE_RPC_ALREADY_PROCESSED;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
810
    }
H
hzcheng 已提交
811

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
812
    if (pConn->inType != 0) {
813
      tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
814
              pConn->inTranId, pHead->tranId);
815
      return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
816
    }
H
hzcheng 已提交
817

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
818 819
    pConn->inTranId = pHead->tranId;
    pConn->inType = pHead->msgType;
H
hzcheng 已提交
820

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
821
    return 0;
H
hzcheng 已提交
822 823
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
824
static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
825
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
826
  pConn->peerId = pHead->sourceId;
H
hzcheng 已提交
827

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
828
  if (pConn->outType == 0 || pConn->pContext == NULL) {
829
    return TSDB_CODE_RPC_UNEXPECTED_RESPONSE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
830
  }
H
hzcheng 已提交
831

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
832
  if (pHead->tranId != pConn->outTranId) {
833
    return TSDB_CODE_RPC_INVALID_TRAN_ID;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
834
  }
H
hzcheng 已提交
835

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
836
  if (pHead->msgType != pConn->outType + 1) {
837
    return TSDB_CODE_RPC_INVALID_RESPONSE_TYPE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
838
  }
H
hzcheng 已提交
839

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
840 841 842
  taosTmrStopA(&pConn->pTimer);
  pConn->retry = 0;

843
  if (pHead->code == TSDB_CODE_RPC_AUTH_REQUIRED && pRpc->spi) {
844
    tDebug("%s, authentication shall be restarted", pConn->info);
845 846
    pConn->secured = 0;
    rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);      
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
847 848
    if (pConn->connType != RPC_CONN_TCPC)
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
849
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
850 851
  }

852 853 854 855 856 857 858 859 860 861
  if (pHead->code == TSDB_CODE_RPC_MISMATCHED_LINK_ID) {
    tDebug("%s, mismatched linkUid, link shall be restarted", pConn->info);
    pConn->secured = 0;
    ((SRpcHead *)pConn->pReqMsg)->destId = 0;
    rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);      
    if (pConn->connType != RPC_CONN_TCPC)
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
    return TSDB_CODE_RPC_ALREADY_PROCESSED;
  }

862
  if (pHead->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
863
    if (pConn->tretry <= tsRpcMaxRetry) {
864
      tDebug("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
865 866
      pConn->tretry++;
      rpcSendReqHead(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
867 868
      if (pConn->connType != RPC_CONN_TCPC)
        pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
869
      return TSDB_CODE_RPC_ALREADY_PROCESSED;
H
hzcheng 已提交
870
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
871
      // peer still in processing, give up
872
      tDebug("%s, server processing takes too long time, give up", pConn->info);
873
      pHead->code = TSDB_CODE_RPC_TOO_SLOW;
H
hzcheng 已提交
874 875
    }
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
876

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
877 878
  pConn->outType = 0;
  pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
879
  pConn->reqMsgLen = 0;
880 881

  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
882 883
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
884
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
885 886
  int32_t    sid;
  SRpcConn  *pConn = NULL;
H
hzcheng 已提交
887

888
  SRpcHead *pHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
889

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
890
  sid = htonl(pHead->destId);
H
hzcheng 已提交
891

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
892
  if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) {
893
    tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
894
    terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE; return NULL;
H
hzcheng 已提交
895 896
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
897
  if (sid < 0 || sid >= pRpc->sessions) {
898
    tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid,
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
899
           pRpc->sessions, taosMsg[pHead->msgType]);
900
    terrno = TSDB_CODE_RPC_INVALID_SESSION_ID; return NULL;
H
hzcheng 已提交
901 902
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
903
  pConn = rpcGetConnObj(pRpc, sid, pRecv);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
904
  if (pConn == NULL) {
905
    tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno)); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
906
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
907
  } 
H
hzcheng 已提交
908

909
  rpcLockConn(pConn);
H
hzcheng 已提交
910

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
911 912
  if (rpcIsReq(pHead->msgType)) {
    pConn->ahandle = (void *)pHead->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
913
    snprintf(pConn->info, sizeof(pConn->info), "%s %p %p", pRpc->label, pConn, pConn->ahandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
914 915 916
  }

  sid = pConn->sid;
917
  if (pConn->chandle == NULL) pConn->chandle = pRecv->chandle;
J
jtao1735 已提交
918
  pConn->peerIp = pRecv->ip; 
919
  pConn->peerPort = pRecv->port;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
920
  if (pHead->port) pConn->peerPort = htons(pHead->port); 
H
hzcheng 已提交
921

922
  terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
923 924 925 926

  // code can be transformed only after authentication
  pHead->code = htonl(pHead->code);

927
  if (terrno == 0) {
J
jtao1735 已提交
928
    if (pHead->encrypt) {
929 930
      // decrypt here
    }
H
hzcheng 已提交
931

932 933 934
    if ( rpcIsReq(pHead->msgType) ) {
      terrno = rpcProcessReqHead(pConn, pHead);
      pConn->connType = pRecv->connType;
935

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
936 937 938 939 940 941
      // stop idle timer
      taosTmrStopA(&pConn->pIdleTimer);  

      // client shall send the request within tsRpcTime again for UDP, double it 
      if (pConn->connType != RPC_CONN_TCPS)
        pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl);
942 943 944
    } else {
      terrno = rpcProcessRspHead(pConn, pHead);
    }
H
hzcheng 已提交
945 946
  }

947
  rpcUnlockConn(pConn);
H
hzcheng 已提交
948

949
  return pConn;
H
hzcheng 已提交
950 951
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
952 953
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
954
  if (pConn->pReqMsg == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
955 956

  // if there are pending request, notify the app
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
957
  rpcAddRef(pRpc);
958
  tDebug("%s, notify the server app, connection is gone", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
959 960 961 962

  SRpcMsg rpcMsg;
  rpcMsg.pCont = pConn->pReqMsg;     // pReqMsg is re-used to store the APP context from server
  rpcMsg.contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length
963
  rpcMsg.ahandle = pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
964 965 966
  rpcMsg.handle = pConn;
  rpcMsg.msgType = pConn->inType;
  rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
967 968
  pConn->pReqMsg = NULL;
  pConn->reqMsgLen = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
969 970 971
  if (pRpc->cfp) (*(pRpc->cfp))(&rpcMsg, NULL);
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
972
static void rpcProcessBrokenLink(SRpcConn *pConn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
973
  if (pConn == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
974
  SRpcInfo *pRpc = pConn->pRpc;
975
  tDebug("%s, link is broken", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
976 977

  rpcLockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
978

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
979 980
  if (pConn->outType) {
    SRpcReqContext *pContext = pConn->pContext;
981
    pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
982
    pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
983 984
    taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
  }
985

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
986
  if (pConn->inType) rpcReportBrokenLinkToServer(pConn); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
987

988
  rpcReleaseConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
989
  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
990 991
}

992 993 994 995
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
  SRpcHead  *pHead = (SRpcHead *)pRecv->msg;
  SRpcInfo  *pRpc = (SRpcInfo *)pRecv->shandle;
  SRpcConn  *pConn = (SRpcConn *)pRecv->thandle;
H
hzcheng 已提交
996

S
Shengliang Guan 已提交
997
  tDump(pRecv->msg, pRecv->msgLen);
998 999

  // underlying UDP layer does not know it is server or client
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1000
  pRecv->connType = pRecv->connType | pRpc->connType;  
H
hzcheng 已提交
1001

B
Bomin Zhang 已提交
1002
  if (pRecv->msg == NULL) {
1003
    rpcProcessBrokenLink(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1004 1005 1006
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1007 1008
  terrno = 0;
  pConn = rpcProcessMsgHead(pRpc, pRecv);
H
hzcheng 已提交
1009

1010
  tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x",
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1011
        pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, 
1012
        pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
H
hzcheng 已提交
1013

H
TD-34  
hzcheng 已提交
1014
  int32_t code = terrno;
1015
  if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) {
H
TD-34  
hzcheng 已提交
1016
    if (code != 0) { // parsing error
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1017
      if (rpcIsReq(pHead->msgType)) {
H
TD-34  
hzcheng 已提交
1018
        rpcSendErrorMsgToPeer(pRecv, code);
1019
        tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1020
      } 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1021
    } else { // msg is passed to app only parsing is ok 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1022
      rpcProcessIncomingMsg(pConn, pHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1023
    }
H
hzcheng 已提交
1024 1025
  }

1026
  if (code) rpcFreeMsg(pRecv->msg); // parsing failed, msg shall be freed
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1027 1028
  return pConn;
}
H
hzcheng 已提交
1029

1030 1031 1032
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
  SRpcInfo       *pRpc = pContext->pRpc;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1033
  pContext->signature = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1034
  pContext->pConn = NULL;
1035 1036
  if (pContext->pRsp) { 
    // for synchronous API
1037
    memcpy(pContext->pSet, &pContext->epSet, sizeof(SRpcEpSet));
1038
    memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1039
    tsem_post(pContext->pSem);
1040 1041
  } else {
    // for asynchronous API 
1042 1043 1044
    SRpcEpSet *pEpSet = NULL;
    if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) 
      pEpSet = &pContext->epSet;  
1045

1046
    (*pRpc->cfp)(pMsg, pEpSet);  
1047 1048 1049 1050 1051 1052
  }

  // free the request message
  rpcFreeCont(pContext->pCont); 
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1053
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
S
slguan 已提交
1054

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1055
  SRpcInfo *pRpc = pConn->pRpc;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1056
  SRpcMsg   rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1057

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1058
  pHead = rpcDecompressRpcMsg(pHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1059 1060 1061 1062
  rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
  rpcMsg.pCont = pHead->content;
  rpcMsg.msgType = pHead->msgType;
  rpcMsg.code = pHead->code; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1063
  rpcMsg.ahandle = pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1064
   
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1065
  if ( rpcIsReq(pHead->msgType) ) {
1066 1067 1068
    if (rpcMsg.contLen > 0) {
      rpcMsg.handle = pConn;
      rpcAddRef(pRpc);  // add the refCount for requests
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1069

1070 1071 1072
      // start the progress timer to monitor the response from server app
      if (pConn->connType != RPC_CONN_TCPS) 
        pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1073
 
1074 1075 1076 1077 1078 1079
      // notify the server app
      (*(pRpc->cfp))(&rpcMsg, NULL);
    } else {
      tDebug("%s, message body is empty, ignore", pConn->info);
      rpcFreeCont(rpcMsg.pCont);
    }
H
hzcheng 已提交
1080
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1081 1082
    // it's a response
    SRpcReqContext *pContext = pConn->pContext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1083
    rpcMsg.handle = pContext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1084
    pConn->pContext = NULL;
1085
    pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1086

1087
    // for UDP, port may be changed by server, the port in epSet shall be used for cache
1088
    if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
1089
      rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType);    
1090 1091 1092
    } else {
      rpcCloseConn(pConn);
    }
H
hzcheng 已提交
1093

1094
    if (pHead->code == TSDB_CODE_RPC_REDIRECT) { 
guanshengliang's avatar
guanshengliang 已提交
1095 1096
      pContext->redirect++;
      if (pContext->redirect > TSDB_MAX_REPLICA) {
1097
        pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1098
        tWarn("%s, too many redirects, quit", pConn->info);
guanshengliang's avatar
guanshengliang 已提交
1099 1100 1101
      }
    }

1102
    if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1103
      pContext->numOfTry = 0;
1104 1105 1106 1107
      memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
      tDebug("%s, redirect is received, numOfEps:%d", pConn->info, pContext->epSet.numOfEps);
      for (int i=0; i<pContext->epSet.numOfEps; ++i) 
        pContext->epSet.port[i] = htons(pContext->epSet.port[i]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1108
      rpcSendReqToServer(pRpc, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1109
      rpcFreeCont(rpcMsg.pCont);
1110
    } else if (pHead->code == TSDB_CODE_RPC_NOT_READY) {
1111 1112
      pContext->code = pHead->code;
      rpcProcessConnError(pContext, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1113
      rpcFreeCont(rpcMsg.pCont);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1114
    } else {
1115
      rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1116 1117 1118 1119
    }
  }
}

1120
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1121 1122
  char       msg[RPC_MSG_OVERHEAD];
  SRpcHead  *pHead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1123

1124
  // set msg header
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1125 1126 1127 1128
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
  pHead->msgType = pConn->inType+1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1129
  pHead->spi = pConn->spi;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1130 1131 1132 1133
  pHead->encrypt = 0;
  pHead->tranId = pConn->inTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
1134
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1135
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1136
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1137
  pHead->code = htonl(code);
H
hzcheng 已提交
1138

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157
  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
  pConn->secured = 1; // connection shall be secured
}

static void rpcSendReqHead(SRpcConn *pConn) {
  char       msg[RPC_MSG_OVERHEAD];
  SRpcHead  *pHead;

  // set msg header
  memset(msg, 0, sizeof(SRpcHead));
  pHead = (SRpcHead *)msg;
  pHead->version = 1;
  pHead->msgType = pConn->outType;
  pHead->spi = pConn->spi;
  pHead->encrypt = 0;
  pHead->tranId = pConn->outTranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1158
  pHead->ahandle = (uint64_t)pConn->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1159 1160 1161 1162
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
  pHead->code = 1;

  rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1163
}
H
hjxilinx 已提交
1164

1165
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1166 1167
  SRpcHead  *pRecvHead, *pReplyHead;
  char       msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) ];
1168 1169
  uint32_t   timeStamp;
  int        msgLen;
H
hzcheng 已提交
1170

1171
  pRecvHead = (SRpcHead *)pRecv->msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1172
  pReplyHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1173

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1174 1175 1176 1177
  memset(msg, 0, sizeof(SRpcHead));
  pReplyHead->version = pRecvHead->version;
  pReplyHead->msgType = (char)(pRecvHead->msgType + 1);
  pReplyHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1178
  pReplyHead->encrypt = pRecvHead->encrypt;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1179
  pReplyHead->tranId = pRecvHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1180
  pReplyHead->sourceId = pRecvHead->destId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1181
  pReplyHead->destId = pRecvHead->sourceId;
1182
  pReplyHead->linkUid = pRecvHead->linkUid;
1183
  pReplyHead->ahandle = pRecvHead->ahandle;
H
hzcheng 已提交
1184

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1185 1186
  pReplyHead->code = htonl(code);
  msgLen = sizeof(SRpcHead);
H
hzcheng 已提交
1187

1188
  if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP) {
1189
    // include a time stamp if client's time is not synchronized well
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1190
    uint8_t *pContent = pReplyHead->content;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1191
    timeStamp = htonl(taosGetTimestampSec());
1192 1193
    memcpy(pContent, &timeStamp, sizeof(timeStamp));
    msgLen += sizeof(timeStamp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1194
  }
H
hzcheng 已提交
1195

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1196
  pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1197
  (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle);
H
hzcheng 已提交
1198

1199
  return; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1200 1201
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1202
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1203 1204 1205 1206
  SRpcHead  *pHead = rpcHeadFromCont(pContext->pCont);
  char      *msg = (char *)pHead;
  int        msgLen = rpcMsgLenFromCont(pContext->contLen);
  char       msgType = pContext->msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1207

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1208
  pContext->numOfTry++;
1209
  SRpcConn *pConn = rpcSetupConnToServer(pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1210 1211 1212 1213 1214 1215
  if (pConn == NULL) {
    pContext->code = terrno;
    taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
    return;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1216
  pContext->pConn = pConn;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1217
  pConn->ahandle = pContext->ahandle;
1218
  rpcLockConn(pConn);
1219

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1220
  // set the message header  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1221 1222 1223
  pHead->version = 1;
  pHead->msgType = msgType;
  pHead->encrypt = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1224 1225
  pConn->tranId++;
  if ( pConn->tranId == 0 ) pConn->tranId++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1226 1227 1228 1229
  pHead->tranId = pConn->tranId;
  pHead->sourceId = pConn->ownId;
  pHead->destId = pConn->peerId;
  pHead->port = 0;
1230
  pHead->linkUid = pConn->linkUid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1231
  pHead->ahandle = (uint64_t)pConn->ahandle;
1232
  memcpy(pHead->user, pConn->user, tListLen(pHead->user));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1233 1234 1235

  // set the connection parameters
  pConn->outType = msgType;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1236
  pConn->outTranId = pHead->tranId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1237 1238
  pConn->pReqMsg = msg;
  pConn->reqMsgLen = msgLen;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1239
  pConn->pContext = pContext;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1240

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1241
  rpcSendMsgToPeer(pConn, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1242 1243
  if (pConn->connType != RPC_CONN_TCPC)
    taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1244 1245

  rpcUnlockConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1246
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1247 1248

static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1249 1250
  int        writtenLen = 0;
  SRpcHead  *pHead = (SRpcHead *)msg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1251

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1252
  msgLen = rpcAddAuthPart(pConn, msg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1253

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1254
  if ( rpcIsReq(pHead->msgType)) {
1255
    tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d",
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1256 1257
           pConn->info, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort, 
           msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1258
  } else {
J
jtao1735 已提交
1259
    if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured
1260
    tDebug("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1261 1262
           pConn->info, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort, 
           htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
1263
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1264

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1265
  //tTrace("connection type is: %d", pConn->connType);
1266
  writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1267

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1268
  if (writtenLen != msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1269
    tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
1270 1271
  }
 
S
Shengliang Guan 已提交
1272
  tDump(msg, msgLen);
H
hzcheng 已提交
1273 1274
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1275
static void rpcProcessConnError(void *param, void *id) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1276
  SRpcReqContext *pContext = (SRpcReqContext *)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1277 1278
  SRpcInfo       *pRpc = pContext->pRpc;
  SRpcMsg         rpcMsg;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1279
 
H
hjxilinx 已提交
1280 1281 1282 1283
  if (pRpc == NULL) {
    return;
  }
  
1284
  tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);
H
hzcheng 已提交
1285

1286
  if (pContext->numOfTry >= pContext->epSet.numOfEps) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1287
    rpcMsg.msgType = pContext->msgType+1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1288
    rpcMsg.ahandle = pContext->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1289 1290 1291
    rpcMsg.code = pContext->code;
    rpcMsg.pCont = NULL;
    rpcMsg.contLen = 0;
1292 1293

    rpcNotifyClient(pContext, &rpcMsg);
H
hzcheng 已提交
1294
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1295
    // move to next IP 
1296 1297
    pContext->epSet.inUse++;
    pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1298
    rpcSendReqToServer(pRpc, pContext);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1299
  }
H
hzcheng 已提交
1300 1301
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1302 1303 1304
static void rpcProcessRetryTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;
H
hzcheng 已提交
1305

1306
  rpcLockConn(pConn);
H
hzcheng 已提交
1307

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1308
  if (pConn->outType && pConn->user[0]) {
1309
    tDebug("%s, expected %s is not received", pConn->info, taosMsg[(int)pConn->outType + 1]);
H
hzcheng 已提交
1310 1311 1312
    pConn->pTimer = NULL;
    pConn->retry++;

S
slguan 已提交
1313
    if (pConn->retry < 4) {
1314
      tDebug("%s, re-send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1315
      rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);      
1316
      pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
H
hzcheng 已提交
1317 1318
    } else {
      // close the connection
1319
      tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
1320 1321
      if (pConn->pContext) {
        pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
1322
        pConn->pReqMsg = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1323
        taosTmrStart(rpcProcessConnError, 0, pConn->pContext, pRpc->tmrCtrl);
1324 1325
        rpcReleaseConn(pConn);
      }
H
hzcheng 已提交
1326
    }
1327
  } else {
1328
    tDebug("%s, retry timer not processed", pConn->info);
H
hzcheng 已提交
1329 1330
  }

1331
  rpcUnlockConn(pConn);
H
hzcheng 已提交
1332 1333
}

1334 1335 1336
static void rpcProcessIdleTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;

1337 1338
  rpcLockConn(pConn);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1339
  if (pConn->user[0]) {
1340
    tDebug("%s, close the connection since no activity", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1341
    if (pConn->inType) rpcReportBrokenLinkToServer(pConn); 
1342
    rpcReleaseConn(pConn);
1343
  } else {
1344
    tDebug("%s, idle timer:%p not processed", pConn->info, tmrId);
1345
  }
1346 1347

  rpcUnlockConn(pConn);
1348 1349 1350 1351 1352 1353
}

static void rpcProcessProgressTimer(void *param, void *tmrId) {
  SRpcConn *pConn = (SRpcConn *)param;
  SRpcInfo *pRpc = pConn->pRpc;

1354
  rpcLockConn(pConn);
1355

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1356
  if (pConn->inType && pConn->user[0]) {
1357
    tDebug("%s, progress timer expired, send progress", pConn->info);
1358
    rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
1359
    pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
1360
  } else {
1361
    tDebug("%s, progress timer:%p not processed", pConn->info, tmrId);
1362 1363
  }

1364
  rpcUnlockConn(pConn);
1365 1366 1367
}

static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1368 1369 1370
  SRpcHead  *pHead = rpcHeadFromCont(pCont);
  int32_t    finalLen = 0;
  int        overhead = sizeof(SRpcComp);
1371 1372 1373 1374 1375
  
  if (!NEEDTO_COMPRESSS_MSG(contLen)) {
    return contLen;
  }
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1376
  char *buf = malloc (contLen + overhead + 8);  // 8 extra bytes
1377
  if (buf == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1378
    tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
1379 1380 1381 1382
    return contLen;
  }
  
  int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
S
Shuduo Sang 已提交
1383
  tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead);
1384 1385 1386 1387 1388 1389
  
  /*
   * only the compressed size is less than the value of contLen - overhead, the compression is applied
   * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
   */
  if (compLen < contLen - overhead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1390 1391 1392
    SRpcComp *pComp = (SRpcComp *)pCont;
    pComp->reserved = 0; 
    pComp->contLen = htonl(contLen); 
1393 1394
    memcpy(pCont + overhead, buf, compLen);
    
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1395
    pHead->comp = 1;
S
Shuduo Sang 已提交
1396
    tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen);
1397 1398 1399 1400 1401 1402 1403 1404 1405
    finalLen = compLen + overhead;
  } else {
    finalLen = contLen;
  }

  free(buf);
  return finalLen;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1406
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1407
  int overhead = sizeof(SRpcComp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1408 1409 1410
  SRpcHead   *pNewHead = NULL;  
  uint8_t    *pCont = pHead->content;
  SRpcComp   *pComp = (SRpcComp *)pHead->content;
1411

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1412
  if (pHead->comp) {
1413
    // decompress the content
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1414 1415
    assert(pComp->reserved == 0);
    int contLen = htonl(pComp->contLen);
1416 1417
  
    // prepare the temporary buffer to decompress message
1418 1419
    char *temp = (char *)malloc(contLen + RPC_MSG_OVERHEAD);
    pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext
1420
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1421
    if (pNewHead) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1422
      int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1423 1424
      int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char *)pNewHead->content, compLen, contLen);
      assert(origLen == contLen);
1425
    
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1426
      memcpy(pNewHead, pHead, sizeof(SRpcHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1427
      pNewHead->msgLen = rpcMsgLenFromCont(origLen);
1428
      rpcFreeMsg(pHead); // free the compressed message buffer
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1429
      pHead = pNewHead; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1430
      //tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen);
1431
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1432
      tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
1433 1434 1435
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1436
  return pHead;
1437 1438
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1439
static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) {
H
hzcheng 已提交
1440 1441 1442 1443
  MD5_CTX context;
  int     ret = -1;

  MD5Init(&context);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1444 1445 1446
  MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
  MD5Update(&context, (uint8_t *)pMsg, msgLen);
  MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
H
hzcheng 已提交
1447 1448 1449 1450 1451 1452 1453
  MD5Final(&context);

  if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;

  return ret;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1454
static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) {
H
hzcheng 已提交
1455 1456 1457
  MD5_CTX context;

  MD5Init(&context);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1458
  MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
H
hzcheng 已提交
1459
  MD5Update(&context, (uint8_t *)pMsg, msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1460
  MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
H
hzcheng 已提交
1461 1462 1463 1464
  MD5Final(&context);

  memcpy(pAuth, context.digest, sizeof(context.digest));
}
1465 1466

static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1467
  SRpcHead *pHead = (SRpcHead *)msg;
1468

1469
  if (pConn->spi && pConn->secured == 0) {
1470
    // add auth part
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1471
    pHead->spi = pConn->spi;
1472 1473 1474
    SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
    pDigest->timeStamp = htonl(taosGetTimestampSec());
    msgLen += sizeof(SRpcDigest);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1475
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1476
    rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
1477
  } else {
1478
    pHead->spi = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1479
    pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
1480 1481 1482 1483 1484 1485
  }

  return msgLen;
}

static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1486
  SRpcHead *pHead = (SRpcHead *)msg;
1487
  int       code = 0;
1488

1489 1490
  if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){
    // secured link, or no authentication 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1491
    pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1492
    // tTrace("%s, secured link, no auth is required", pConn->info);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1493 1494 1495 1496 1497 1498
    return 0;
  }

  if ( !rpcIsReq(pHead->msgType) ) {
    // for response, if code is auth failure, it shall bypass the auth process
    code = htonl(pHead->code);
1499 1500
    if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
        code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_INVALID_USER || code == TSDB_CODE_RPC_NOT_READY) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1501
      pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
1502
      // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1503
      return 0;
1504
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1505 1506 1507
  }
 
  code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1508
  if (pHead->spi == pConn->spi) {
1509
    // authentication
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1510
    SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest));
1511 1512 1513 1514 1515

    int32_t delta;
    delta = (int32_t)htonl(pDigest->timeStamp);
    delta -= (int32_t)taosGetTimestampSec();
    if (abs(delta) > 900) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1516
      tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
1517
      code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
1518
    } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1519
      if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
1520
        tDebug("%s, authentication failed, msg discarded", pConn->info);
1521
        code = TSDB_CODE_RPC_AUTH_FAILURE;
1522
      } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1523
        pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
1524
        if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1;  // link is secured for client
1525
        // tTrace("%s, message is authenticated", pConn->info);
1526 1527 1528
      }
    }
  } else {
1529
    tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
1530
    code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
1531 1532 1533 1534 1535
  }

  return code;
}

1536 1537
static void rpcLockConn(SRpcConn *pConn) {
  int64_t tid = taosGetPthreadId();
1538
  int     i = 0;
1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
  while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) {
    if (++i % 1000 == 0) {
      sched_yield();
    }
  }
}

static void rpcUnlockConn(SRpcConn *pConn) {
  int64_t tid = taosGetPthreadId();
  if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) {
    assert(false);
  }
}
1552

1553 1554
static void rpcAddRef(SRpcInfo *pRpc)
{  
1555
   atomic_add_fetch_32(&pRpc->refCount, 1);
1556 1557 1558 1559
}

static void rpcDecRef(SRpcInfo *pRpc)
{ 
1560
  if (atomic_sub_fetch_32(&pRpc->refCount, 1) == 0) {
1561
    rpcCloseConnCache(pRpc->pCache);
dengyihao's avatar
dengyihao 已提交
1562
    taosHashCleanup(pRpc->hash);
dengyihao's avatar
dengyihao 已提交
1563
    taosTmrCleanUp(pRpc->tmrCtrl);
dengyihao's avatar
dengyihao 已提交
1564
    taosIdPoolCleanUp(pRpc->idPool);
1565 1566 1567

    tfree(pRpc->connList);
    pthread_mutex_destroy(&pRpc->mutex);
1568
    tDebug("%s rpc resources are released", pRpc->label);
1569 1570 1571 1572
    tfree(pRpc);
  }
}