未验证 提交 e1306a9b 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19049 from taosdata/fix/addAuthFailure

fix: add auth failure
...@@ -4,7 +4,7 @@ PROJECT(TDengine) ...@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER) IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER}) SET(TD_VER_NUMBER ${VERNUMBER})
ELSE () ELSE ()
SET(TD_VER_NUMBER "2.6.0.0") SET(TD_VER_NUMBER "2.6.0.27")
ENDIF () ENDIF ()
IF (DEFINED VERCOMPATIBLE) IF (DEFINED VERCOMPATIBLE)
......
name: tdengine name: tdengine
base: core20 base: core20
version: '2.6.0.0' version: '2.6.0.27'
icon: snap/gui/t-dengine.svg icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT. summary: an open-source big data platform designed and optimized for IoT.
description: | description: |
......
...@@ -13,126 +13,126 @@ ...@@ -13,126 +13,126 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "hash.h"
#include "lz4.h"
#include "os.h" #include "os.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tidpool.h" #include "tidpool.h"
#include "tmd5.h" #include "tmd5.h"
#include "tmempool.h" #include "tmempool.h"
#include "ttimer.h"
#include "tutil.h"
#include "lz4.h"
#include "tref.h" #include "tref.h"
#include "taoserror.h"
#include "tsocket.h"
#include "tglobal.h"
#include "taosmsg.h"
#include "trpc.h" #include "trpc.h"
#include "hash.h" #include "tsocket.h"
#include "rpcLog.h" #include "ttimer.h"
#include "rpcUdp.h" #include "tutil.h"
#include "rpcCache.h"
#include "rpcTcp.h"
#include "rpcHead.h"
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead *) ((char*)cont - sizeof(SRpcHead))) #define rpcHeadFromCont(cont) ((SRpcHead *)((char *)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) #define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) #define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U) #define rpcIsReq(type) (type & 1U)
typedef struct { typedef struct {
int sessions; // number of sessions allowed int sessions; // number of sessions allowed
int numOfThreads; // number of threads to process incoming messages int numOfThreads; // number of threads to process incoming messages
int idleTime; // milliseconds; int idleTime; // milliseconds;
uint16_t localPort; uint16_t localPort;
int8_t connType; int8_t connType;
int index; // for UDP server only, round robin for multiple threads int index; // for UDP server only, round robin for multiple threads
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
char user[TSDB_UNI_LEN]; // meter ID char user[TSDB_UNI_LEN]; // meter ID
char spi; // security parameter index char spi; // security parameter index
char encrypt; // encrypt algorithm char encrypt; // encrypt algorithm
char secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
void (*cfp)(SRpcMsg *, SRpcEpSet *); void (*cfp)(SRpcMsg *, SRpcEpSet *);
int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t refCount; int32_t refCount;
void *idPool; // handle to ID pool void *idPool; // handle to ID pool
void *tmrCtrl; // handle to timer void *tmrCtrl; // handle to timer
SHashObj *hash; // handle returned by hash utility SHashObj *hash; // handle returned by hash utility
void *tcphandle;// returned handle from TCP initialization void *tcphandle; // returned handle from TCP initialization
void *udphandle;// returned handle from UDP initialization void *udphandle; // returned handle from UDP initialization
void *pCache; // connection cache void *pCache; // connection cache
pthread_mutex_t mutex; pthread_mutex_t mutex;
struct SRpcConn *connList; // connection list struct SRpcConn *connList; // connection list
} SRpcInfo; } SRpcInfo;
typedef struct SSendInfo { typedef struct SSendInfo {
void *pContext; void *pContext;
void *pConn; void *pConn;
void *pFdObj; void *pFdObj;
SOCKET fd; SOCKET fd;
} SSendInfo; } SSendInfo;
typedef struct { typedef struct {
SRpcInfo *pRpc; // associated SRpcInfo SRpcInfo *pRpc; // associated SRpcInfo
SRpcEpSet epSet; // ip list provided by app SRpcEpSet epSet; // ip list provided by app
void *ahandle; // handle provided by app void *ahandle; // handle provided by app
struct SRpcConn *pConn; // pConn allocated struct SRpcConn *pConn; // pConn allocated
char msgType; // message type char msgType; // message type
uint8_t *pCont; // content provided by app uint8_t *pCont; // content provided by app
int32_t contLen; // content length int32_t contLen; // content length
int32_t code; // error code int32_t code; // error code
int16_t numOfTry; // number of try for different servers int16_t numOfTry; // number of try for different servers
int8_t oldInUse; // server EP inUse passed by app int8_t oldInUse; // server EP inUse passed by app
int8_t redirect; // flag to indicate redirect int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef int64_t rid; // refId returned by taosAddRef
SRpcMsg *pRsp; // for synchronous API SRpcMsg *pRsp; // for synchronous API
tsem_t *pSem; // for synchronous API tsem_t *pSem; // for synchronous API
SRpcEpSet *pSet; // for synchronous API SRpcEpSet *pSet; // for synchronous API
SSendInfo sendInfo; // save last send information SSendInfo sendInfo; // save last send information
char msg[0]; // RpcHead starts from here char msg[0]; // RpcHead starts from here
} SRpcReqContext; } SRpcReqContext;
typedef struct SRpcConn { typedef struct SRpcConn {
char info[48];// debug info: label + pConn + ahandle char info[48]; // debug info: label + pConn + ahandle
int sid; // session ID int sid; // session ID
uint32_t ownId; // own link ID uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID uint32_t peerId; // peer link ID
char user[TSDB_UNI_LEN]; // user ID for the link char user[TSDB_UNI_LEN]; // user ID for the link
char spi; // security parameter index char spi; // security parameter index
char encrypt; // encryption, 0:1 char encrypt; // encryption, 0:1
char secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
char secured; // if set to 1, no authentication char secured; // if set to 1, no authentication
uint16_t localPort; // for UDP only uint16_t localPort; // for UDP only
uint32_t linkUid; // connection unique ID assigned by client uint32_t linkUid; // connection unique ID assigned by client
uint32_t peerIp; // peer IP uint32_t peerIp; // peer IP
uint16_t peerPort; // peer port uint16_t peerPort; // peer port
char peerFqdn[TSDB_FQDN_LEN]; // peer FQDN or ip string char peerFqdn[TSDB_FQDN_LEN]; // peer FQDN or ip string
uint16_t tranId; // outgoing transcation ID, for build message uint16_t tranId; // outgoing transcation ID, for build message
uint16_t outTranId; // outgoing transcation ID uint16_t outTranId; // outgoing transcation ID
uint16_t inTranId; // transcation ID for incoming msg uint16_t inTranId; // transcation ID for incoming msg
uint8_t outType; // message type for outgoing request uint8_t outType; // message type for outgoing request
uint8_t inType; // message type for incoming request uint8_t inType; // message type for incoming request
void *chandle; // handle passed by TCP/UDP connection layer void *chandle; // handle passed by TCP/UDP connection layer
void *ahandle; // handle provided by upper app layter void *ahandle; // handle provided by upper app layter
int retry; // number of retry for sending request int retry; // number of retry for sending request
int tretry; // total retry int tretry; // total retry
void *pTimer; // retry timer to monitor the response void *pTimer; // retry timer to monitor the response
void *pIdleTimer; // idle timer void *pIdleTimer; // idle timer
char *pRspMsg; // response message including header char *pRspMsg; // response message including header
int rspMsgLen; // response messag length int rspMsgLen; // response messag length
char *pReqMsg; // request message including header char *pReqMsg; // request message including header
int reqMsgLen; // request message length int reqMsgLen; // request message length
SRpcInfo *pRpc; // the associated SRpcInfo SRpcInfo *pRpc; // the associated SRpcInfo
int8_t connType; // connection type int8_t connType; // connection type
int64_t lockedBy; // lock for connection int64_t lockedBy; // lock for connection
SRpcReqContext *pContext; // request context SRpcReqContext *pContext; // request context
int64_t rid; // probe msg use rid get pContext int64_t rid; // probe msg use rid get pContext
} SRpcConn; } SRpcConn;
int tsRpcMaxUdpSize = 15000; // bytes int tsRpcMaxUdpSize = 15000; // bytes
...@@ -144,42 +144,30 @@ int tsRpcOverhead; ...@@ -144,42 +144,30 @@ int tsRpcOverhead;
static int tsRpcRefId = -1; static int tsRpcRefId = -1;
static int32_t tsRpcNum = 0; static int32_t tsRpcNum = 0;
//static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT; // static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
// server:0 client:1 tcp:2 udp:0 // server:0 client:1 tcp:2 udp:0
#define RPC_CONN_UDPS 0 #define RPC_CONN_UDPS 0
#define RPC_CONN_UDPC 1 #define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2 #define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3 #define RPC_CONN_TCPC 3
#define RPC_CONN_AUTO 4 // need tcp use tcp #define RPC_CONN_AUTO 4 // need tcp use tcp
void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = { void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
taosInitUdpConnection, taosInitUdpConnection, taosInitUdpConnection, taosInitTcpServer, taosInitTcpClient};
taosInitUdpConnection,
taosInitTcpServer,
taosInitTcpClient
};
void (*taosCleanUpConn[])(void *thandle) = { void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer,
taosCleanUpUdpConnection, taosCleanUpTcpClient};
taosCleanUpUdpConnection,
taosCleanUpTcpServer,
taosCleanUpTcpClient
};
void (*taosStopConn[])(void *thandle) = { void (*taosStopConn[])(void *thandle) = {
taosStopUdpConnection, taosStopUdpConnection,
taosStopUdpConnection, taosStopUdpConnection,
taosStopTcpServer, taosStopTcpServer,
taosStopTcpClient, taosStopTcpClient,
}; };
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = { int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
taosSendUdpData, taosSendUdpData, taosSendUdpData, taosSendTcpData, taosSendTcpData};
taosSendUdpData,
taosSendTcpData,
taosSendTcpData
};
void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = { void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = {
taosOpenUdpConnection, taosOpenUdpConnection,
...@@ -188,12 +176,7 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port ...@@ -188,12 +176,7 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port
taosOpenTcpClientConnection, taosOpenTcpClientConnection,
}; };
void (*taosCloseConn[])(void *chandle) = { void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpConnection, taosCloseTcpConnection};
NULL,
NULL,
taosCloseTcpConnection,
taosCloseTcpConnection
};
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType); static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType);
static void rpcCloseConn(void *thandle); static void rpcCloseConn(void *thandle);
...@@ -215,15 +198,45 @@ static void rpcProcessRetryTimer(void *, void *); ...@@ -215,15 +198,45 @@ static void rpcProcessRetryTimer(void *, void *);
static void rpcProcessIdleTimer(void *param, void *tmrId); static void rpcProcessIdleTimer(void *param, void *tmrId);
static void rpcProcessProgressTimer(void *param, void *tmrId); static void rpcProcessProgressTimer(void *param, void *tmrId);
static void rpcFreeMsg(void *msg); static void rpcFreeMsg(void *msg);
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen);
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead); static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen); static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
static void rpcLockConn(SRpcConn *pConn); static void rpcLockConn(SRpcConn *pConn);
static void rpcUnlockConn(SRpcConn *pConn); static void rpcUnlockConn(SRpcConn *pConn);
static void rpcAddRef(SRpcInfo *pRpc); static void rpcAddRef(SRpcInfo *pRpc);
static void rpcDecRef(SRpcInfo *pRpc); static void rpcDecRef(SRpcInfo *pRpc);
static bool rpcGenUID(uint32_t *first, uint32_t *second) {
static uint64_t hashId = 0;
static uint32_t tranId = 0;
if (hashId == 0) {
char uid[128] = {0};
taosGetSystemUid(uid);
hashId = MurmurHash3_32(uid, strlen(uid));
}
uint64_t id = 0;
while (true) {
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&tranId, 1);
if (val >= 0xFFFF) {
atomic_store_32(&tranId, 0);
}
id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
if (id) {
break;
}
}
*first = (id >> 32) & 0xFFFFFFFF;
*second = id & 0xFFFFFFFF;
return true;
}
static void rpcFree(void *p) { static void rpcFree(void *p) {
tTrace("free mem: %p", p); tTrace("free mem: %p", p);
...@@ -231,40 +244,40 @@ static void rpcFree(void *p) { ...@@ -231,40 +244,40 @@ static void rpcFree(void *p) {
} }
int32_t rpcInit(void) { int32_t rpcInit(void) {
tsProgressTimer = tsRpcTimer/2; tsProgressTimer = tsRpcTimer / 2;
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer; tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsProgressTimer;
tsRpcHeadSize = RPC_MSG_OVERHEAD; tsRpcHeadSize = RPC_MSG_OVERHEAD;
tsRpcOverhead = sizeof(SRpcReqContext); tsRpcOverhead = sizeof(SRpcReqContext);
tsRpcRefId = taosOpenRef(200, rpcFree); tsRpcRefId = taosOpenRef(200, rpcFree);
return 0; return 0;
} }
void rpcCleanup(void) { void rpcCleanup(void) {
taosCloseRef(tsRpcRefId); taosCloseRef(tsRpcRefId);
tsRpcRefId = -1; tsRpcRefId = -1;
} }
void *rpcOpen(const SRpcInit *pInit) { void *rpcOpen(const SRpcInit *pInit) {
SRpcInfo *pRpc; SRpcInfo *pRpc;
//pthread_once(&tsRpcInit, rpcInit); // pthread_once(&tsRpcInit, rpcInit);
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL; if (pRpc == NULL) return NULL;
if(pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); if (pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
if (pRpc->connType == TAOS_CONN_CLIENT) { if (pRpc->connType == TAOS_CONN_CLIENT) {
pRpc->numOfThreads = pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads;
} else { } else {
pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
} }
pRpc->idleTime = pInit->idleTime; pRpc->idleTime = pInit->idleTime;
pRpc->localPort = pInit->localPort; pRpc->localPort = pInit->localPort;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->sessions = pInit->sessions+1; pRpc->sessions = pInit->sessions + 1;
if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user)); if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret)); if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret));
if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey)); if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey));
...@@ -283,14 +296,14 @@ void *rpcOpen(const SRpcInit *pInit) { ...@@ -283,14 +296,14 @@ void *rpcOpen(const SRpcInit *pInit) {
return NULL; return NULL;
} }
pRpc->idPool = taosInitIdPool(pRpc->sessions-1); pRpc->idPool = taosInitIdPool(pRpc->sessions - 1);
if (pRpc->idPool == NULL) { if (pRpc->idPool == NULL) {
tError("%s failed to init ID pool", pRpc->label); tError("%s failed to init ID pool", pRpc->label);
rpcClose(pRpc); rpcClose(pRpc);
return NULL; return NULL;
} }
pRpc->tmrCtrl = taosTmrInit(pRpc->sessions*2 + 1, 50, 10000, pRpc->label); pRpc->tmrCtrl = taosTmrInit(pRpc->sessions * 2 + 1, 50, 10000, pRpc->label);
if (pRpc->tmrCtrl == NULL) { if (pRpc->tmrCtrl == NULL) {
tError("%s failed to init timers", pRpc->label); tError("%s failed to init timers", pRpc->label);
rpcClose(pRpc); rpcClose(pRpc);
...@@ -305,8 +318,8 @@ void *rpcOpen(const SRpcInit *pInit) { ...@@ -305,8 +318,8 @@ void *rpcOpen(const SRpcInit *pInit) {
return NULL; return NULL;
} }
} else { } else {
pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20); pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20);
if ( pRpc->pCache == NULL ) { if (pRpc->pCache == NULL) {
tError("%s failed to init connection cache", pRpc->label); tError("%s failed to init connection cache", pRpc->label);
rpcClose(pRpc); rpcClose(pRpc);
return NULL; return NULL;
...@@ -315,10 +328,10 @@ void *rpcOpen(const SRpcInit *pInit) { ...@@ -315,10 +328,10 @@ void *rpcOpen(const SRpcInit *pInit) {
pthread_mutex_init(&pRpc->mutex, NULL); pthread_mutex_init(&pRpc->mutex, NULL);
pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, pRpc->tcphandle = (*taosInitConn[pRpc->connType | RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads,
pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); rpcProcessMsgFromPeer, pRpc);
pRpc->udphandle = (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, pRpc->udphandle =
pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) { if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) {
tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort); tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort);
...@@ -338,7 +351,7 @@ void rpcClose(void *param) { ...@@ -338,7 +351,7 @@ void rpcClose(void *param) {
(*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); (*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
(*taosStopConn[pRpc->connType])(pRpc->udphandle); (*taosStopConn[pRpc->connType])(pRpc->udphandle);
// close all connections // close all connections
for (int i = 0; i < pRpc->sessions; ++i) { for (int i = 0; i < pRpc->sessions; ++i) {
if (pRpc->connList && pRpc->connList[i].user[0]) { if (pRpc->connList && pRpc->connList[i].user[0]) {
rpcCloseConn((void *)(pRpc->connList + i)); rpcCloseConn((void *)(pRpc->connList + i));
...@@ -379,8 +392,8 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -379,8 +392,8 @@ void *rpcReallocCont(void *ptr, int contLen) {
if (ptr == NULL) return rpcMallocCont(contLen); if (ptr == NULL) return rpcMallocCont(contLen);
char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead); char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead);
if (contLen == 0 ) { if (contLen == 0) {
free(start); free(start);
return NULL; return NULL;
} }
...@@ -389,7 +402,7 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -389,7 +402,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
if (start == NULL) { if (start == NULL) {
tError("failed to realloc cont, size:%d", size); tError("failed to realloc cont, size:%d", size);
return NULL; return NULL;
} }
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
...@@ -399,7 +412,7 @@ TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int6 ...@@ -399,7 +412,7 @@ TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int6
SRpcReqContext *pContext; SRpcReqContext *pContext;
int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
pContext->ahandle = pMsg->ahandle; pContext->ahandle = pMsg->ahandle;
pContext->pRpc = (SRpcInfo *)shandle; pContext->pRpc = (SRpcInfo *)shandle;
pContext->epSet = *pEpSet; pContext->epSet = *pEpSet;
...@@ -408,17 +421,17 @@ TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int6 ...@@ -408,17 +421,17 @@ TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int6
pContext->msgType = pMsg->msgType; pContext->msgType = pMsg->msgType;
pContext->oldInUse = pEpSet->inUse; pContext->oldInUse = pEpSet->inUse;
pContext->connType = RPC_CONN_UDPC; pContext->connType = RPC_CONN_UDPC;
if (contLen > tsRpcMaxUdpSize || tsRpcForceTcp) pContext->connType = RPC_CONN_TCPC; if (contLen > tsRpcMaxUdpSize || tsRpcForceTcp) pContext->connType = RPC_CONN_TCPC;
// connection type is application specific. // connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection // for TDengine, all the query, show commands shall have TCP connection
char type = pMsg->msgType; char type = pMsg->msgType;
if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE || type == TSDB_MSG_TYPE_SUBMIT if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE || type == TSDB_MSG_TYPE_SUBMIT ||
|| type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_CM_TABLES_META ||
|| type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_TABLE_META type == TSDB_MSG_TYPE_CM_TABLE_META || type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_DM_STATUS ||
|| type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_DM_STATUS || type == TSDB_MSG_TYPE_CM_ALTER_TABLE) type == TSDB_MSG_TYPE_CM_ALTER_TABLE)
pContext->connType = RPC_CONN_TCPC; pContext->connType = RPC_CONN_TCPC;
pContext->rid = taosAddRef(tsRpcRefId, pContext); pContext->rid = taosAddRef(tsRpcRefId, pContext);
...@@ -428,26 +441,26 @@ TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int6 ...@@ -428,26 +441,26 @@ TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int6
} }
void rpcSendResponse(const SRpcMsg *pRsp) { void rpcSendResponse(const SRpcMsg *pRsp) {
int msgLen = 0; int msgLen = 0;
SRpcConn *pConn = (SRpcConn *)pRsp->handle; SRpcConn *pConn = (SRpcConn *)pRsp->handle;
SRpcMsg rpcMsg = *pRsp; SRpcMsg rpcMsg = *pRsp;
SRpcMsg *pMsg = &rpcMsg; SRpcMsg *pMsg = &rpcMsg;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
if ( pMsg->pCont == NULL ) { if (pMsg->pCont == NULL) {
pMsg->pCont = rpcMallocCont(0); pMsg->pCont = rpcMallocCont(0);
pMsg->contLen = 0; pMsg->contLen = 0;
} }
SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont); SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont);
char *msg = (char *)pHead; char *msg = (char *)pHead;
pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
msgLen = rpcMsgLenFromCont(pMsg->contLen); msgLen = rpcMsgLenFromCont(pMsg->contLen);
rpcLockConn(pConn); rpcLockConn(pConn);
if ( pConn->inType == 0 || pConn->user[0] == 0 ) { if (pConn->inType == 0 || pConn->user[0] == 0) {
tError("%s, connection is already released, rsp wont be sent", pConn->info); tError("%s, connection is already released, rsp wont be sent", pConn->info);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
...@@ -457,7 +470,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) { ...@@ -457,7 +470,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
// set msg header // set msg header
pHead->version = 1; pHead->version = 1;
pHead->msgType = pConn->inType+1; pHead->msgType = pConn->inType + 1;
pHead->spi = pConn->spi; pHead->spi = pConn->spi;
pHead->encrypt = pConn->encrypt; pHead->encrypt = pConn->encrypt;
pHead->tranId = pConn->inTranId; pHead->tranId = pConn->inTranId;
...@@ -466,13 +479,13 @@ void rpcSendResponse(const SRpcMsg *pRsp) { ...@@ -466,13 +479,13 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
pHead->linkUid = pConn->linkUid; pHead->linkUid = pConn->linkUid;
pHead->port = htons(pConn->localPort); pHead->port = htons(pConn->localPort);
pHead->code = htonl(pMsg->code); pHead->code = htonl(pMsg->code);
pHead->ahandle = (uint64_t) pConn->ahandle; pHead->ahandle = (uint64_t)pConn->ahandle;
// set pConn parameters // set pConn parameters
pConn->inType = 0; pConn->inType = 0;
// response message is released until new response is sent // response message is released until new response is sent
rpcFreeMsg(pConn->pRspMsg); rpcFreeMsg(pConn->pRspMsg);
pConn->pRspMsg = msg; pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen; pConn->rspMsgLen = msgLen;
if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--; if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--;
...@@ -485,23 +498,22 @@ void rpcSendResponse(const SRpcMsg *pRsp) { ...@@ -485,23 +498,22 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
rpcSendMsgToPeer(pConn, msg, msgLen); rpcSendMsgToPeer(pConn, msg, msgLen);
// if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured // if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured
if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY) if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY) pConn->secured = 1; // connection shall be secured
pConn->secured = 1; // connection shall be secured
if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);
pConn->pReqMsg = NULL; pConn->pReqMsg = NULL;
pConn->reqMsgLen = 0; pConn->reqMsgLen = 0;
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
rpcDecRef(pRpc); // decrease the referene count rpcDecRef(pRpc); // decrease the referene count
return; return;
} }
void rpcSendRedirectRsp(void *thandle, const SRpcEpSet *pEpSet) { void rpcSendRedirectRsp(void *thandle, const SRpcEpSet *pEpSet) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
memset(&rpcMsg, 0, sizeof(rpcMsg)); memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.contLen = sizeof(SRpcEpSet); rpcMsg.contLen = sizeof(SRpcEpSet);
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
if (rpcMsg.pCont == NULL) return; if (rpcMsg.pCont == NULL) return;
...@@ -517,7 +529,7 @@ void rpcSendRedirectRsp(void *thandle, const SRpcEpSet *pEpSet) { ...@@ -517,7 +529,7 @@ void rpcSendRedirectRsp(void *thandle, const SRpcEpSet *pEpSet) {
} }
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
SRpcConn *pConn = (SRpcConn *)thandle; SRpcConn *pConn = (SRpcConn *)thandle;
if (pConn->user[0] == 0) return -1; if (pConn->user[0] == 0) return -1;
pInfo->clientIp = pConn->peerIp; pInfo->clientIp = pConn->peerIp;
...@@ -530,11 +542,11 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { ...@@ -530,11 +542,11 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SRpcReqContext *pContext; SRpcReqContext *pContext;
pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
memset(pRsp, 0, sizeof(SRpcMsg)); memset(pRsp, 0, sizeof(SRpcMsg));
tsem_t sem; tsem_t sem;
tsem_init(&sem, 0, 0); tsem_init(&sem, 0, 0);
pContext->pSem = &sem; pContext->pSem = &sem;
pContext->pRsp = pRsp; pContext->pRsp = pRsp;
...@@ -551,13 +563,13 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) ...@@ -551,13 +563,13 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp)
// this API is used by server app to keep an APP context in case connection is broken // this API is used by server app to keep an APP context in case connection is broken
int rpcReportProgress(void *handle, char *pCont, int contLen) { int rpcReportProgress(void *handle, char *pCont, int contLen) {
SRpcConn *pConn = (SRpcConn *)handle; SRpcConn *pConn = (SRpcConn *)handle;
int code = 0; int code = 0;
rpcLockConn(pConn); rpcLockConn(pConn);
if (pConn->user[0]) { if (pConn->user[0]) {
// pReqMsg and reqMsgLen is re-used to store the context from app server // pReqMsg and reqMsgLen is re-used to store the context from app server
pConn->pReqMsg = pCont; pConn->pReqMsg = pCont;
pConn->reqMsgLen = contLen; pConn->reqMsgLen = contLen;
} else { } else {
tDebug("%s, rpc connection is already released", pConn->info); tDebug("%s, rpc connection is already released", pConn->info);
...@@ -570,7 +582,6 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) { ...@@ -570,7 +582,6 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) {
} }
void rpcCancelRequest(int64_t rid) { void rpcCancelRequest(int64_t rid) {
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid); SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid);
if (pContext == NULL) return; if (pContext == NULL) return;
...@@ -580,7 +591,7 @@ void rpcCancelRequest(int64_t rid) { ...@@ -580,7 +591,7 @@ void rpcCancelRequest(int64_t rid) {
} }
static void rpcFreeMsg(void *msg) { static void rpcFreeMsg(void *msg) {
if ( msg ) { if (msg) {
char *temp = (char *)msg - sizeof(SRpcReqContext); char *temp = (char *)msg - sizeof(SRpcReqContext);
tTrace("free mem: %p", temp); tTrace("free mem: %p", temp);
free(temp); free(temp);
...@@ -592,14 +603,14 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, ...@@ -592,14 +603,14 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
uint32_t peerIp = taosGetIpv4FromFqdn(peerFqdn); uint32_t peerIp = taosGetIpv4FromFqdn(peerFqdn);
if (peerIp == 0xFFFFFFFF) { if (peerIp == 0xFFFFFFFF) {
tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn);
terrno = TSDB_CODE_RPC_FQDN_ERROR; terrno = TSDB_CODE_RPC_FQDN_ERROR;
return NULL; return NULL;
} }
pConn = rpcAllocateClientConn(pRpc); pConn = rpcAllocateClientConn(pRpc);
if (pConn) { if (pConn) {
tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn)); tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn));
pConn->peerIp = peerIp; pConn->peerIp = peerIp;
pConn->peerPort = peerPort; pConn->peerPort = peerPort;
...@@ -607,7 +618,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, ...@@ -607,7 +618,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
pConn->connType = connType; pConn->connType = connType;
if (taosOpenConn[connType]) { if (taosOpenConn[connType]) {
void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle; void *shandle = (connType & RPC_CONN_TCP) ? pRpc->tcphandle : pRpc->udphandle;
pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort); pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
if (pConn->chandle == NULL) { if (pConn->chandle == NULL) {
tError("failed to connect to:0x%x:%d", pConn->peerIp, pConn->peerPort); tError("failed to connect to:0x%x:%d", pConn->peerIp, pConn->peerPort);
...@@ -632,13 +643,14 @@ static void rpcReleaseConn(SRpcConn *pConn) { ...@@ -632,13 +643,14 @@ static void rpcReleaseConn(SRpcConn *pConn) {
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
taosTmrStopA(&pConn->pIdleTimer); taosTmrStopA(&pConn->pIdleTimer);
if ( pRpc->connType == TAOS_CONN_SERVER) { if (pRpc->connType == TAOS_CONN_SERVER) {
char hashstr[40] = {0}; char hashstr[40] = {0};
size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId,
pConn->connType);
taosHashRemove(pRpc->hash, hashstr, size); taosHashRemove(pRpc->hash, hashstr, size);
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL; pConn->pRspMsg = NULL;
// if server has ever reported progress, free content // if server has ever reported progress, free content
if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); // do not use rpcFreeMsg if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); // do not use rpcFreeMsg
} else { } else {
...@@ -646,17 +658,17 @@ static void rpcReleaseConn(SRpcConn *pConn) { ...@@ -646,17 +658,17 @@ static void rpcReleaseConn(SRpcConn *pConn) {
if (pConn->outType && pConn->pReqMsg) { if (pConn->outType && pConn->pReqMsg) {
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
if (pContext) { if (pContext) {
if (pContext->pRsp) { if (pContext->pRsp) {
// for synchronous API, post semaphore to unblock app // for synchronous API, post semaphore to unblock app
pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR; pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR;
pContext->pRsp->pCont = NULL; pContext->pRsp->pCont = NULL;
pContext->pRsp->contLen = 0; pContext->pRsp->contLen = 0;
tsem_post(pContext->pSem); tsem_post(pContext->pSem);
} }
pContext->pConn = NULL; pContext->pConn = NULL;
taosRemoveRef(tsRpcRefId, pContext->rid); taosRemoveRef(tsRpcRefId, pContext->rid);
} else { } else {
assert(0); assert(0);
} }
} }
} }
...@@ -685,8 +697,7 @@ static void rpcCloseConn(void *thandle) { ...@@ -685,8 +697,7 @@ static void rpcCloseConn(void *thandle) {
rpcLockConn(pConn); rpcLockConn(pConn);
if (pConn->user[0]) if (pConn->user[0]) rpcReleaseConn(pConn);
rpcReleaseConn(pConn);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
} }
...@@ -694,6 +705,9 @@ static void rpcCloseConn(void *thandle) { ...@@ -694,6 +705,9 @@ static void rpcCloseConn(void *thandle) {
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
SRpcConn *pConn = NULL; SRpcConn *pConn = NULL;
uint32_t transId, linkUid;
rpcGenUID(&transId, &linkUid);
int sid = taosAllocateId(pRpc->idPool); int sid = taosAllocateId(pRpc->idPool);
if (sid <= 0) { if (sid <= 0) {
tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions); tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
...@@ -703,9 +717,11 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { ...@@ -703,9 +717,11 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
pConn->pRpc = pRpc; pConn->pRpc = pRpc;
pConn->sid = sid; pConn->sid = sid;
pConn->tranId = (uint16_t)(taosRand() & 0xFFFF); // pConn->tranId = (uint16_t)(taosRand() & 0xFFFF);
pConn->tranId = transId;
pConn->ownId = htonl(pConn->sid); pConn->ownId = htonl(pConn->sid);
pConn->linkUid = (uint32_t)((int64_t)pConn + (int64_t)getpid() + (int64_t)pConn->tranId); // pConn->linkUid = (uint32_t)((int64_t)pConn + (int64_t)getpid() + (int64_t)pConn->tranId);
pConn->linkUid = linkUid;
pConn->spi = pRpc->spi; pConn->spi = pRpc->spi;
pConn->encrypt = pRpc->encrypt; pConn->encrypt = pRpc->encrypt;
if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN);
...@@ -720,8 +736,9 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -720,8 +736,9 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
char hashstr[40] = {0}; char hashstr[40] = {0};
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); size_t size =
snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
// check if it is already allocated // check if it is already allocated
SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size)); SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
if (ppConn) pConn = *ppConn; if (ppConn) pConn = *ppConn;
...@@ -770,22 +787,23 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -770,22 +787,23 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
} }
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s", pRpc->label, pConn, pConn->linkUid, sid, hashstr); tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s", pRpc->label, pConn, pConn->linkUid, sid,
hashstr);
} }
return pConn; return pConn;
} }
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
SRpcConn *pConn = NULL; SRpcConn *pConn = NULL;
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
if (sid) { if (sid) {
pConn = pRpc->connList + sid; pConn = pRpc->connList + sid;
if (pConn->user[0] == 0) pConn = NULL; if (pConn->user[0] == 0) pConn = NULL;
} }
if (pConn == NULL) { if (pConn == NULL) {
if (pRpc->connType == TAOS_CONN_SERVER) { if (pRpc->connType == TAOS_CONN_SERVER) {
pConn = rpcAllocateServerConn(pRpc, pRecv); pConn = rpcAllocateServerConn(pRpc, pRecv);
} else { } else {
...@@ -806,14 +824,15 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { ...@@ -806,14 +824,15 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
} }
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
SRpcConn *pConn; SRpcConn *pConn;
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
SRpcEpSet *pEpSet = &pContext->epSet; SRpcEpSet *pEpSet = &pContext->epSet;
pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); pConn =
if ( pConn == NULL || pConn->user[0] == 0) { rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
if (pConn == NULL || pConn->user[0] == 0) {
pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
} }
if (pConn) { if (pConn) {
pConn->tretry = 0; pConn->tretry = 0;
...@@ -828,55 +847,52 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { ...@@ -828,55 +847,52 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
} }
static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
if (pConn->peerId == 0) {
if (pConn->peerId == 0) { pConn->peerId = pHead->sourceId;
pConn->peerId = pHead->sourceId; } else {
} else { if (pConn->peerId != pHead->sourceId) {
if (pConn->peerId != pHead->sourceId) { tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, pConn->peerId, pHead->sourceId);
tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, return TSDB_CODE_RPC_INVALID_VALUE;
pConn->peerId, pHead->sourceId);
return TSDB_CODE_RPC_INVALID_VALUE;
}
} }
}
if (pConn->inTranId == pHead->tranId) { if (pConn->inTranId == pHead->tranId) {
if (pConn->inType == pHead->msgType) { if (pConn->inType == pHead->msgType) {
if (pHead->code == 0) { if (pHead->code == 0) {
tDebug("%s, %s is retransmitted", pConn->info, taosMsg[pHead->msgType]); tDebug("%s, %s is retransmitted", pConn->info, taosMsg[pHead->msgType]);
rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS); rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS);
} else {
// do nothing, it is heart beat from client
}
} else if (pConn->inType == 0) {
tDebug("%s, %s is already processed, tranId:%d", pConn->info, taosMsg[pHead->msgType], pConn->inTranId);
rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response
} else { } else {
tDebug("%s, mismatched message %s and tranId", pConn->info, taosMsg[pHead->msgType]); // do nothing, it is heart beat from client
} }
} else if (pConn->inType == 0) {
// do not reply any message tDebug("%s, %s is already processed, tranId:%d", pConn->info, taosMsg[pHead->msgType], pConn->inTranId);
return TSDB_CODE_RPC_ALREADY_PROCESSED; rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response
} else {
tDebug("%s, mismatched message %s and tranId", pConn->info, taosMsg[pHead->msgType]);
} }
if (pConn->inType != 0) { // do not reply any message
tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, return TSDB_CODE_RPC_ALREADY_PROCESSED;
pConn->inTranId, pHead->tranId); }
return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
}
if (rpcContLenFromMsg(pHead->msgLen) <= 0) { if (pConn->inType != 0) {
tDebug("%s, message body is empty, ignore", pConn->info); tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, pConn->inTranId, pHead->tranId);
return TSDB_CODE_RPC_APP_ERROR; return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
} }
pConn->inTranId = pHead->tranId; if (rpcContLenFromMsg(pHead->msgLen) <= 0) {
pConn->inType = pHead->msgType; tDebug("%s, message body is empty, ignore", pConn->info);
return TSDB_CODE_RPC_APP_ERROR;
}
// start the progress timer to monitor the response from server app pConn->inTranId = pHead->tranId;
if (pConn->connType != RPC_CONN_TCPS) pConn->inType = pHead->msgType;
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl);
// start the progress timer to monitor the response from server app
return 0; if (pConn->connType != RPC_CONN_TCPS)
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl);
return 0;
} }
static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
...@@ -901,7 +917,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -901,7 +917,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
if (pHead->code == TSDB_CODE_RPC_AUTH_REQUIRED && pRpc->spi) { if (pHead->code == TSDB_CODE_RPC_AUTH_REQUIRED && pRpc->spi) {
tDebug("%s, authentication shall be restarted", pConn->info); tDebug("%s, authentication shall be restarted", pConn->info);
pConn->secured = 0; pConn->secured = 0;
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
if (pConn->connType != RPC_CONN_TCPC) if (pConn->connType != RPC_CONN_TCPC)
pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
return TSDB_CODE_RPC_ALREADY_PROCESSED; return TSDB_CODE_RPC_ALREADY_PROCESSED;
...@@ -911,7 +927,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -911,7 +927,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
tDebug("%s, mismatched linkUid, link shall be restarted", pConn->info); tDebug("%s, mismatched linkUid, link shall be restarted", pConn->info);
pConn->secured = 0; pConn->secured = 0;
((SRpcHead *)pConn->pReqMsg)->destId = 0; ((SRpcHead *)pConn->pReqMsg)->destId = 0;
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
if (pConn->connType != RPC_CONN_TCPC) if (pConn->connType != RPC_CONN_TCPC)
pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
return TSDB_CODE_RPC_ALREADY_PROCESSED; return TSDB_CODE_RPC_ALREADY_PROCESSED;
...@@ -937,25 +953,25 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -937,25 +953,25 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
pConn->reqMsgLen = 0; pConn->reqMsgLen = 0;
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
if (pHead->code == TSDB_CODE_RPC_REDIRECT) { if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) { if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) {
// if EpSet is not included in the msg, treat it as NOT_READY // if EpSet is not included in the msg, treat it as NOT_READY
pHead->code = TSDB_CODE_RPC_NOT_READY; pHead->code = TSDB_CODE_RPC_NOT_READY;
} else { } else {
pContext->redirect++; pContext->redirect++;
if (pContext->redirect > TSDB_MAX_REPLICA) { if (pContext->redirect > TSDB_MAX_REPLICA) {
pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
tWarn("%s, too many redirects, quit", pConn->info); tWarn("%s, too many redirects, quit", pConn->info);
} }
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) { static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) {
int32_t sid; int32_t sid;
SRpcConn *pConn = NULL; SRpcConn *pConn = NULL;
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
...@@ -964,20 +980,23 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont ...@@ -964,20 +980,23 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) {
tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE; return NULL; terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE;
return NULL;
} }
if (sid < 0 || sid >= pRpc->sessions) { if (sid < 0 || sid >= pRpc->sessions) {
tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, pRpc->sessions,
pRpc->sessions, taosMsg[pHead->msgType]); taosMsg[pHead->msgType]);
terrno = TSDB_CODE_RPC_INVALID_SESSION_ID; return NULL; terrno = TSDB_CODE_RPC_INVALID_SESSION_ID;
return NULL;
} }
// compatibility between old version client and new version server, since 2.4.0.0 // compatibility between old version client and new version server, since 2.4.0.0
if (rpcIsReq(pHead->msgType)){ if (rpcIsReq(pHead->msgType)) {
if((htonl(pHead->msgVer) >> 16 != tsVersion >> 24) || if ((htonl(pHead->msgVer) >> 16 != tsVersion >> 24) ||
((htonl(pHead->msgVer) >> 16 == tsVersion >> 24) && htonl(pHead->msgVer) < ((2 << 16) | (4 << 8)))){ ((htonl(pHead->msgVer) >> 16 == tsVersion >> 24) && htonl(pHead->msgVer) < ((2 << 16) | (4 << 8)))) {
tError("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion, taosMsg[pHead->msgType]); tError("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion,
taosMsg[pHead->msgType]);
terrno = TSDB_CODE_RPC_INVALID_VERSION; terrno = TSDB_CODE_RPC_INVALID_VERSION;
return NULL; return NULL;
} }
...@@ -985,9 +1004,9 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont ...@@ -985,9 +1004,9 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
pConn = rpcGetConnObj(pRpc, sid, pRecv); pConn = rpcGetConnObj(pRpc, sid, pRecv);
if (pConn == NULL) { if (pConn == NULL) {
tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno)); tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno));
return NULL; return NULL;
} }
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN || pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN || pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
return pConn; return pConn;
...@@ -1002,9 +1021,9 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont ...@@ -1002,9 +1021,9 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
sid = pConn->sid; sid = pConn->sid;
if (pConn->chandle == NULL) pConn->chandle = pRecv->chandle; if (pConn->chandle == NULL) pConn->chandle = pRecv->chandle;
pConn->peerIp = pRecv->ip; pConn->peerIp = pRecv->ip;
pConn->peerPort = pRecv->port; pConn->peerPort = pRecv->port;
if (pHead->port) pConn->peerPort = htons(pHead->port); if (pHead->port) pConn->peerPort = htons(pHead->port);
terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen); terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
...@@ -1016,16 +1035,16 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont ...@@ -1016,16 +1035,16 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
// decrypt here // decrypt here
} }
if ( rpcIsReq(pHead->msgType) ) { if (rpcIsReq(pHead->msgType)) {
pConn->connType = pRecv->connType; pConn->connType = pRecv->connType;
terrno = rpcProcessReqHead(pConn, pHead); terrno = rpcProcessReqHead(pConn, pHead);
// stop idle timer // stop idle timer
taosTmrStopA(&pConn->pIdleTimer); taosTmrStopA(&pConn->pIdleTimer);
// client shall send the request within tsRpcTime again for UDP, double it // client shall send the request within tsRpcTime again for UDP, double it
if (pConn->connType != RPC_CONN_TCPS) if (pConn->connType != RPC_CONN_TCPS)
pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*20, pConn, pRpc->tmrCtrl); pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer * 20, pConn, pRpc->tmrCtrl);
} else { } else {
terrno = rpcProcessRspHead(pConn, pHead); terrno = rpcProcessRspHead(pConn, pHead);
*ppContext = pConn->pContext; *ppContext = pConn->pContext;
...@@ -1038,11 +1057,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont ...@@ -1038,11 +1057,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
} }
static void doRpcReportBrokenLinkToServer(void *param, void *id) { static void doRpcReportBrokenLinkToServer(void *param, void *id) {
SRpcMsg *pRpcMsg = (SRpcMsg *)(param); SRpcMsg *pRpcMsg = (SRpcMsg *)(param);
SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle); SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle);
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
(*(pRpc->cfp))(pRpcMsg, NULL); (*(pRpc->cfp))(pRpcMsg, NULL);
free(pRpcMsg); free(pRpcMsg);
} }
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) { static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
...@@ -1053,12 +1072,12 @@ static void rpcReportBrokenLinkToServer(SRpcConn *pConn) { ...@@ -1053,12 +1072,12 @@ static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
tDebug("%s, notify the server app, connection is gone", pConn->info); tDebug("%s, notify the server app, connection is gone", pConn->info);
SRpcMsg *rpcMsg = malloc(sizeof(SRpcMsg)); SRpcMsg *rpcMsg = malloc(sizeof(SRpcMsg));
rpcMsg->pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server rpcMsg->pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server
rpcMsg->contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length rpcMsg->contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length
rpcMsg->ahandle = pConn->ahandle; rpcMsg->ahandle = pConn->ahandle;
rpcMsg->handle = pConn; rpcMsg->handle = pConn;
rpcMsg->msgType = pConn->inType; rpcMsg->msgType = pConn->inType;
rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
pConn->pReqMsg = NULL; pConn->pReqMsg = NULL;
pConn->reqMsgLen = 0; pConn->reqMsgLen = 0;
if (pRpc->cfp) { if (pRpc->cfp) {
...@@ -1084,8 +1103,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -1084,8 +1103,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
*rid = pContext->rid; *rid = pContext->rid;
taosTmrStart(rpcProcessConnError, 0, rid, pRpc->tmrCtrl); taosTmrStart(rpcProcessConnError, 0, rid, pRpc->tmrCtrl);
} }
if (pConn->inType) rpcReportBrokenLinkToServer(pConn); if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
rpcReleaseConn(pConn); rpcReleaseConn(pConn);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
...@@ -1093,12 +1112,12 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -1093,12 +1112,12 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
// process probe msg , return true is probe msg, false is not probe msg // process probe msg , return true is probe msg, false is not probe msg
static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
uint64_t ahandle = pHead->ahandle; uint64_t ahandle = pHead->ahandle;
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) { if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
// response to // response to
char msg[RPC_MSG_OVERHEAD]; char msg[RPC_MSG_OVERHEAD];
SRpcHead *pRspHead; SRpcHead *pRspHead;
// set msg header // set msg header
memset(msg, 0, sizeof(SRpcHead)); memset(msg, 0, sizeof(SRpcHead));
...@@ -1107,13 +1126,13 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { ...@@ -1107,13 +1126,13 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
pRspHead->msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP; pRspHead->msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP;
pRspHead->version = 1; pRspHead->version = 1;
pRspHead->ahandle = pHead->ahandle; pRspHead->ahandle = pHead->ahandle;
pRspHead->tranId = pHead->tranId; pRspHead->tranId = pHead->tranId;
pRspHead->code = 0; pRspHead->code = 0;
pRspHead->linkUid = pHead->linkUid; pRspHead->linkUid = pHead->linkUid;
rpcLockConn(pConn); rpcLockConn(pConn);
pRspHead->sourceId = pConn->ownId; pRspHead->sourceId = pConn->ownId;
pRspHead->destId = pConn->peerId; pRspHead->destId = pConn->peerId;
memcpy(pRspHead->user, pHead->user, tListLen(pHead->user)); memcpy(pRspHead->user, pHead->user, tListLen(pHead->user));
bool ret = rpcSendMsgToPeer(pConn, pRspHead, sizeof(SRpcHead)); bool ret = rpcSendMsgToPeer(pConn, pRspHead, sizeof(SRpcHead));
...@@ -1122,19 +1141,20 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { ...@@ -1122,19 +1141,20 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
rpcFreeMsg(pRecv->msg); rpcFreeMsg(pRecv->msg);
} else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { } else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
if(pConn) { if (pConn) {
rpcLockConn(pConn); rpcLockConn(pConn);
// get req content // get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, pConn->rid); SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, pConn->rid);
if (pContext) { if (pContext) {
rpcProcessIncomingMsg(pConn, pHead, pContext); rpcProcessIncomingMsg(pConn, pHead, pContext);
taosReleaseRef(tsRpcRefId, pConn->rid); taosReleaseRef(tsRpcRefId, pConn->rid);
} else { } else {
tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pContext is NULL. pConn->rid=0x%" PRIX64, ahandle, pConn->rid); tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pContext is NULL. pConn->rid=0x%" PRIX64, ahandle,
pConn->rid);
rpcFreeMsg(pRecv->msg); rpcFreeMsg(pRecv->msg);
} }
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
} else { } else {
tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pConn is NULL.", ahandle); tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pConn is NULL.", ahandle);
...@@ -1144,14 +1164,14 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { ...@@ -1144,14 +1164,14 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
} }
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle;
SRpcConn *pConn = (SRpcConn *)pRecv->thandle; SRpcConn *pConn = (SRpcConn *)pRecv->thandle;
tDump(pRecv->msg, pRecv->msgLen); tDump(pRecv->msg, pRecv->msgLen);
// underlying UDP layer does not know it is server or client // underlying UDP layer does not know it is server or client
pRecv->connType = pRecv->connType | pRpc->connType; pRecv->connType = pRecv->connType | pRpc->connType;
if (pRecv->msg == NULL) { if (pRecv->msg == NULL) {
rpcProcessBrokenLink(pConn); rpcProcessBrokenLink(pConn);
...@@ -1180,54 +1200,53 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -1180,54 +1200,53 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
int32_t code = terrno; int32_t code = terrno;
if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) { if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) {
if (code != 0) { // parsing error if (code != 0) { // parsing error
if (rpcIsReq(pHead->msgType)) { if (rpcIsReq(pHead->msgType)) {
rpcSendErrorMsgToPeer(pRecv, code); rpcSendErrorMsgToPeer(pRecv, code);
if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE) { if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE) {
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
if (pHead->msgType + 1 > 1 && pHead->msgType+1 < TSDB_MSG_TYPE_MAX) { if (pHead->msgType + 1 > 1 && pHead->msgType + 1 < TSDB_MSG_TYPE_MAX) {
tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
taosMsg[pHead->msgType + 1], code);
} else { } else {
tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], code); tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
} taosMsg[pHead->msgType], code);
} }
} else { // msg is passed to app only parsing is ok }
} else { // msg is passed to app only parsing is ok
rpcProcessIncomingMsg(pConn, pHead, pContext); rpcProcessIncomingMsg(pConn, pHead, pContext);
} }
} }
if (code) rpcFreeMsg(pRecv->msg); // parsing failed, msg shall be freed if (code) rpcFreeMsg(pRecv->msg); // parsing failed, msg shall be freed
return pConn; return pConn;
} }
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
pContext->pConn = NULL; pContext->pConn = NULL;
if (pContext->pRsp) { if (pContext->pRsp) {
// for synchronous API // for synchronous API
memcpy(pContext->pSet, &pContext->epSet, sizeof(SRpcEpSet)); memcpy(pContext->pSet, &pContext->epSet, sizeof(SRpcEpSet));
memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
tsem_post(pContext->pSem); tsem_post(pContext->pSem);
} else { } else {
// for asynchronous API // for asynchronous API
SRpcEpSet *pEpSet = NULL; SRpcEpSet *pEpSet = NULL;
if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) pEpSet = &pContext->epSet;
pEpSet = &pContext->epSet;
(*pRpc->cfp)(pMsg, pEpSet); (*pRpc->cfp)(pMsg, pEpSet);
} }
// free the request message // free the request message
if(pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN && pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN_RSP) { if (pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN && pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN_RSP) {
taosRemoveRef(tsRpcRefId, pContext->rid); taosRemoveRef(tsRpcRefId, pContext->rid);
} }
} }
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -1235,9 +1254,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1235,9 +1254,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen); rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = pHead->content; rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code; rpcMsg.code = pHead->code;
if ( rpcIsReq(pHead->msgType) ) { if (rpcIsReq(pHead->msgType)) {
rpcMsg.ahandle = pConn->ahandle; rpcMsg.ahandle = pConn->ahandle;
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
rpcAddRef(pRpc); // add the refCount for requests rpcAddRef(pRpc); // add the refCount for requests
...@@ -1265,7 +1284,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1265,7 +1284,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
// probe msg // probe msg
rpcNotifyClient(pContext, &rpcMsg); rpcNotifyClient(pContext, &rpcMsg);
return ; return;
} }
// reset pConn NULL // reset pConn NULL
...@@ -1273,14 +1292,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1273,14 +1292,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
// for UDP, port may be changed by server, the port in epSet shall be used for cache // for UDP, port may be changed by server, the port in epSet shall be used for cache
if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse],
pConn->connType);
} else { } else {
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
if (pHead->code == TSDB_CODE_RPC_REDIRECT) { if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
pContext->numOfTry = 0; pContext->numOfTry = 0;
SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content; SRpcEpSet *pEpSet = (SRpcEpSet *)pHead->content;
if (pEpSet->numOfEps > 0) { if (pEpSet->numOfEps > 0) {
memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet)); memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps, tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,
...@@ -1293,10 +1313,11 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1293,10 +1313,11 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
} }
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
} else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || pHead->code == TSDB_CODE_DND_EXITING) { } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY ||
pHead->code == TSDB_CODE_DND_EXITING) {
pContext->code = pHead->code; pContext->code = pHead->code;
int64_t *rid = malloc(sizeof(int64_t)); int64_t *rid = malloc(sizeof(int64_t));
*rid = pContext->rid; *rid = pContext->rid;
rpcProcessConnError(rid, NULL); rpcProcessConnError(rid, NULL);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
} else { } else {
...@@ -1306,14 +1327,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1306,14 +1327,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
} }
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
char msg[RPC_MSG_OVERHEAD]; char msg[RPC_MSG_OVERHEAD];
SRpcHead *pHead; SRpcHead *pHead;
// set msg header // set msg header
memset(msg, 0, sizeof(SRpcHead)); memset(msg, 0, sizeof(SRpcHead));
pHead = (SRpcHead *)msg; pHead = (SRpcHead *)msg;
pHead->version = 1; pHead->version = 1;
pHead->msgType = pConn->inType+1; pHead->msgType = pConn->inType + 1;
pHead->spi = pConn->spi; pHead->spi = pConn->spi;
pHead->encrypt = 0; pHead->encrypt = 0;
pHead->tranId = pConn->inTranId; pHead->tranId = pConn->inTranId;
...@@ -1325,12 +1346,12 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { ...@@ -1325,12 +1346,12 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
pHead->code = htonl(code); pHead->code = htonl(code);
rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead)); rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
pConn->secured = 1; // connection shall be secured pConn->secured = 1; // connection shall be secured
} }
static void rpcSendReqHead(SRpcConn *pConn) { static void rpcSendReqHead(SRpcConn *pConn) {
char msg[RPC_MSG_OVERHEAD]; char msg[RPC_MSG_OVERHEAD];
SRpcHead *pHead; SRpcHead *pHead;
// set msg header // set msg header
memset(msg, 0, sizeof(SRpcHead)); memset(msg, 0, sizeof(SRpcHead));
...@@ -1352,10 +1373,10 @@ static void rpcSendReqHead(SRpcConn *pConn) { ...@@ -1352,10 +1373,10 @@ static void rpcSendReqHead(SRpcConn *pConn) {
} }
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
SRpcHead *pRecvHead, *pReplyHead; SRpcHead *pRecvHead, *pReplyHead;
char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) ]; char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t)];
uint32_t timeStamp; uint32_t timeStamp;
int msgLen; int msgLen;
pRecvHead = (SRpcHead *)pRecv->msg; pRecvHead = (SRpcHead *)pRecv->msg;
pReplyHead = (SRpcHead *)msg; pReplyHead = (SRpcHead *)msg;
...@@ -1385,14 +1406,14 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { ...@@ -1385,14 +1406,14 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
(*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle); (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle);
return; return;
} }
static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
char *msg = (char *)pHead; char *msg = (char *)pHead;
int msgLen = rpcMsgLenFromCont(pContext->contLen); int msgLen = rpcMsgLenFromCont(pContext->contLen);
char msgType = pContext->msgType; char msgType = pContext->msgType;
pContext->numOfTry++; pContext->numOfTry++;
SRpcConn *pConn = rpcSetupConnToServer(pContext); SRpcConn *pConn = rpcSetupConnToServer(pContext);
...@@ -1409,13 +1430,13 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1409,13 +1430,13 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pConn->ahandle = pContext->ahandle; pConn->ahandle = pContext->ahandle;
rpcLockConn(pConn); rpcLockConn(pConn);
// set the message header // set the message header
pHead->version = 1; pHead->version = 1;
pHead->msgVer = htonl(tsVersion >> 8); pHead->msgVer = htonl(tsVersion >> 8);
pHead->msgType = msgType; pHead->msgType = msgType;
pHead->encrypt = 0; pHead->encrypt = 0;
pConn->tranId++; pConn->tranId++;
if ( pConn->tranId == 0 ) pConn->tranId++; if (pConn->tranId == 0) pConn->tranId++;
pHead->tranId = pConn->tranId; pHead->tranId = pConn->tranId;
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
...@@ -1430,13 +1451,12 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1430,13 +1451,12 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pConn->pReqMsg = msg; pConn->pReqMsg = msg;
pConn->reqMsgLen = msgLen; pConn->reqMsgLen = msgLen;
pConn->pContext = pContext; pConn->pContext = pContext;
if(pContext) if (pContext) pConn->rid = pContext->rid;
pConn->rid = pContext->rid;
// save // save
pContext->sendInfo.pConn = pConn; pContext->sendInfo.pConn = pConn;
pContext->sendInfo.pFdObj = pConn->chandle; pContext->sendInfo.pFdObj = pConn->chandle;
pContext->sendInfo.fd = taosGetFdID(pConn->chandle); pContext->sendInfo.fd = taosGetFdID(pConn->chandle);
bool ret = rpcSendMsgToPeer(pConn, msg, msgLen); bool ret = rpcSendMsgToPeer(pConn, msg, msgLen);
if (pConn->connType != RPC_CONN_TCPC) if (pConn->connType != RPC_CONN_TCPC)
...@@ -1444,7 +1464,7 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1444,7 +1464,7 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
if(ret == BOOL_FALSE) { if (ret == BOOL_FALSE) {
// try next ip again // try next ip again
pContext->code = terrno; pContext->code = terrno;
// in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query
...@@ -1458,31 +1478,29 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1458,31 +1478,29 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
} }
static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
int writtenLen = 0; int writtenLen = 0;
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)msg;
bool ret = true; bool ret = true;
msgLen = rpcAddAuthPart(pConn, msg, msgLen); msgLen = rpcAddAuthPart(pConn, msg, msgLen);
if ( rpcIsReq(pHead->msgType)) { if (rpcIsReq(pHead->msgType)) {
tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", pConn->info, taosMsg[pHead->msgType],
pConn->info, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort, pConn->peerFqdn, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} else { } else {
if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured
tDebug("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", tDebug("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pConn->info, taosMsg[pHead->msgType],
pConn->info, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort, pConn->peerIp, pConn->peerPort, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} }
//tTrace("connection type is: %d", pConn->connType); // tTrace("connection type is: %d", pConn->connType);
writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
if (writtenLen != msgLen) { if (writtenLen != msgLen) {
tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
ret = false; ret = false;
} }
tDump(msg, msgLen); tDump(msg, msgLen);
return ret; return ret;
} }
...@@ -1491,48 +1509,51 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -1491,48 +1509,51 @@ static void rpcProcessConnError(void *param, void *id) {
if (NULL == param) { if (NULL == param) {
return; return;
} }
int64_t *rid = (int64_t*)param; int64_t *rid = (int64_t *)param;
SRpcReqContext *pContext = (SRpcReqContext *)taosAcquireRef(tsRpcRefId, *rid); SRpcReqContext *pContext = (SRpcReqContext *)taosAcquireRef(tsRpcRefId, *rid);
if (NULL == pContext) { if (NULL == pContext) {
free(param); free(param);
return; return;
} }
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
if (pRpc == NULL) { if (pRpc == NULL) {
taosReleaseRef(tsRpcRefId, *rid); taosReleaseRef(tsRpcRefId, *rid);
free(param); free(param);
return; return;
} }
if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) { if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) {
rpcMsg.msgType = pContext->msgType+1; rpcMsg.msgType = pContext->msgType + 1;
rpcMsg.ahandle = pContext->ahandle; rpcMsg.ahandle = pContext->ahandle;
rpcMsg.code = pContext->code; rpcMsg.code = pContext->code;
rpcMsg.pCont = NULL; rpcMsg.pCont = NULL;
rpcMsg.contLen = 0; rpcMsg.contLen = 0;
if( pContext->numOfTry >= pContext->epSet.numOfEps && rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (pContext->numOfTry >= pContext->epSet.numOfEps && rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if(pContext->msgType == TSDB_MSG_TYPE_SUBMIT || pContext->msgType == TSDB_MSG_TYPE_QUERY) { if (pContext->msgType == TSDB_MSG_TYPE_SUBMIT || pContext->msgType == TSDB_MSG_TYPE_QUERY) {
rpcMsg.code = TSDB_CODE_RPC_VGROUP_NOT_CONNECTED; rpcMsg.code = TSDB_CODE_RPC_VGROUP_NOT_CONNECTED;
} }
} }
tWarn("%s %p, connection error. notify client query over. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType); tWarn("%s %p, connection error. notify client query over. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle,
pContext->numOfTry, pContext->msgType);
rpcNotifyClient(pContext, &rpcMsg); rpcNotifyClient(pContext, &rpcMsg);
} else { } else {
// move to next IP // move to next IP
tWarn("%s %p, connection error. retry to send request again. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType); tWarn("%s %p, connection error. retry to send request again. numOfTry=%d msgType=%d", pRpc->label,
pContext->ahandle, pContext->numOfTry, pContext->msgType);
pContext->epSet.inUse++; pContext->epSet.inUse++;
pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps; pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps;
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
} }
taosReleaseRef(tsRpcRefId, *rid); taosReleaseRef(tsRpcRefId, *rid);
free(param); free(param);
} }
static void rpcProcessRetryTimer(void *param, void *tmrId) { static void rpcProcessRetryTimer(void *param, void *tmrId) {
...@@ -1548,11 +1569,12 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -1548,11 +1569,12 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
if (pConn->retry < 4) { if (pConn->retry < 4) {
tDebug("%s, re-send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); tDebug("%s, re-send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
} else { } else {
// close the connection // close the connection
tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn,
pConn->peerPort);
if (pConn->pContext) { if (pConn->pContext) {
pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
pConn->pContext->pConn = NULL; pConn->pContext->pConn = NULL;
...@@ -1577,7 +1599,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { ...@@ -1577,7 +1599,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
if (pConn->user[0]) { if (pConn->user[0]) {
tDebug("%s, close the connection since no activity", pConn->info); tDebug("%s, close the connection since no activity", pConn->info);
if (pConn->inType) rpcReportBrokenLinkToServer(pConn); if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
rpcReleaseConn(pConn); rpcReleaseConn(pConn);
} else { } else {
tDebug("%s, idle timer:%p not processed", pConn->info, tmrId); tDebug("%s, idle timer:%p not processed", pConn->info, tmrId);
...@@ -1603,34 +1625,34 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { ...@@ -1603,34 +1625,34 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
} }
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen) {
SRpcHead *pHead = rpcHeadFromCont(pCont); SRpcHead *pHead = rpcHeadFromCont(pCont);
int32_t finalLen = 0; int32_t finalLen = 0;
int overhead = sizeof(SRpcComp); int overhead = sizeof(SRpcComp);
if (!NEEDTO_COMPRESSS_MSG(contLen)) { if (!NEEDTO_COMPRESSS_MSG(contLen)) {
return contLen; return contLen;
} }
char *buf = malloc (contLen + overhead + 8); // 8 extra bytes char *buf = malloc(contLen + overhead + 8); // 8 extra bytes
if (buf == NULL) { if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen); tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
return contLen; return contLen;
} }
int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead); tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead);
/* /*
* only the compressed size is less than the value of contLen - overhead, the compression is applied * only the compressed size is less than the value of contLen - overhead, the compression is applied
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/ */
if (compLen > 0 && compLen < contLen - overhead) { if (compLen > 0 && compLen < contLen - overhead) {
SRpcComp *pComp = (SRpcComp *)pCont; SRpcComp *pComp = (SRpcComp *)pCont;
pComp->reserved = 0; pComp->reserved = 0;
pComp->contLen = htonl(contLen); pComp->contLen = htonl(contLen);
memcpy(pCont + overhead, buf, compLen); memcpy(pCont + overhead, buf, compLen);
pHead->comp = 1; pHead->comp = 1;
tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen); tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen);
finalLen = compLen + overhead; finalLen = compLen + overhead;
...@@ -1643,29 +1665,29 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { ...@@ -1643,29 +1665,29 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
} }
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
int overhead = sizeof(SRpcComp); int overhead = sizeof(SRpcComp);
SRpcHead *pNewHead = NULL; SRpcHead *pNewHead = NULL;
uint8_t *pCont = pHead->content; uint8_t *pCont = pHead->content;
SRpcComp *pComp = (SRpcComp *)pHead->content; SRpcComp *pComp = (SRpcComp *)pHead->content;
if (pHead->comp) { if (pHead->comp) {
// decompress the content // decompress the content
assert(pComp->reserved == 0); assert(pComp->reserved == 0);
int contLen = htonl(pComp->contLen); int contLen = htonl(pComp->contLen);
// prepare the temporary buffer to decompress message // prepare the temporary buffer to decompress message
char *temp = (char *)malloc(contLen + RPC_MSG_OVERHEAD); char *temp = (char *)malloc(contLen + RPC_MSG_OVERHEAD);
pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext
if (pNewHead) { if (pNewHead) {
int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char *)pNewHead->content, compLen, contLen); int origLen = LZ4_decompress_safe((char *)(pCont + overhead), (char *)pNewHead->content, compLen, contLen);
assert(origLen == contLen); assert(origLen == contLen);
memcpy(pNewHead, pHead, sizeof(SRpcHead)); memcpy(pNewHead, pHead, sizeof(SRpcHead));
pNewHead->msgLen = rpcMsgLenFromCont(origLen); pNewHead->msgLen = rpcMsgLenFromCont(origLen);
rpcFreeMsg(pHead); // free the compressed message buffer rpcFreeMsg(pHead); // free the compressed message buffer
pHead = pNewHead; pHead = pNewHead;
tTrace("decomp malloc mem:%p", temp); tTrace("decomp malloc mem:%p", temp);
} else { } else {
tError("failed to allocate memory to decompress msg, contLen:%d", contLen); tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
...@@ -1677,7 +1699,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { ...@@ -1677,7 +1699,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) { static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) {
T_MD5_CTX context; T_MD5_CTX context;
int ret = -1; int ret = -1;
tMD5Init(&context); tMD5Init(&context);
tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
...@@ -1725,25 +1747,25 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1725,25 +1747,25 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)msg;
int code = 0; int code = 0;
if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){ if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
// secured link, or no authentication // secured link, or no authentication
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
// tTrace("%s, secured link, no auth is required", pConn->info); // tTrace("%s, secured link, no auth is required", pConn->info);
return 0; return 0;
} }
if ( !rpcIsReq(pHead->msgType) ) { if (!rpcIsReq(pHead->msgType)) {
// for response, if code is auth failure, it shall bypass the auth process // for response, if code is auth failure, it shall bypass the auth process
code = htonl(pHead->code); code = htonl(pHead->code);
if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_INVALID_USER || code == TSDB_CODE_RPC_NOT_READY) { code == TSDB_CODE_MND_INVALID_USER || code == TSDB_CODE_RPC_NOT_READY) {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
return 0; return 0;
} }
} }
code = 0; code = 0;
if (pHead->spi == pConn->spi) { if (pHead->spi == pConn->spi) {
// authentication // authentication
...@@ -1756,12 +1778,12 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1756,12 +1778,12 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
code = TSDB_CODE_RPC_INVALID_TIME_STAMP; code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
} else { } else {
if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { if (rpcAuthenticateMsg(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
tDebug("%s, authentication failed, msg discarded", pConn->info); tDebug("%s, authentication failed, msg discarded", pConn->info);
code = TSDB_CODE_RPC_AUTH_FAILURE; code = TSDB_CODE_RPC_AUTH_FAILURE;
} else { } else {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client if (!rpcIsReq(pHead->msgType)) pConn->secured = 1; // link is secured for client
// tTrace("%s, message is authenticated", pConn->info); // tTrace("%s, message is authenticated", pConn->info);
} }
} }
...@@ -1790,13 +1812,9 @@ static void rpcUnlockConn(SRpcConn *pConn) { ...@@ -1790,13 +1812,9 @@ static void rpcUnlockConn(SRpcConn *pConn) {
} }
} }
static void rpcAddRef(SRpcInfo *pRpc) static void rpcAddRef(SRpcInfo *pRpc) { atomic_add_fetch_32(&pRpc->refCount, 1); }
{
atomic_add_fetch_32(&pRpc->refCount, 1);
}
static void rpcDecRef(SRpcInfo *pRpc) static void rpcDecRef(SRpcInfo *pRpc) {
{
if (atomic_sub_fetch_32(&pRpc->refCount, 1) == 0) { if (atomic_sub_fetch_32(&pRpc->refCount, 1) == 0) {
rpcCloseConnCache(pRpc->pCache); rpcCloseConnCache(pRpc->pCache);
taosHashCleanup(pRpc->hash); taosHashCleanup(pRpc->hash);
...@@ -1812,31 +1830,30 @@ static void rpcDecRef(SRpcInfo *pRpc) ...@@ -1812,31 +1830,30 @@ static void rpcDecRef(SRpcInfo *pRpc)
} }
} }
int32_t rpcUnusedSession(void * rpcInfo, bool bLock) { int32_t rpcUnusedSession(void *rpcInfo, bool bLock) {
SRpcInfo *info = (SRpcInfo *)rpcInfo; SRpcInfo *info = (SRpcInfo *)rpcInfo;
if(info == NULL) if (info == NULL) return 0;
return 0;
return taosIdPoolNumOfFree(info->idPool, bLock); return taosIdPoolNumOfFree(info->idPool, bLock);
} }
bool doRpcSendProbe(SRpcConn *pConn) { bool doRpcSendProbe(SRpcConn *pConn) {
char msg[RPC_MSG_OVERHEAD]; char msg[RPC_MSG_OVERHEAD];
SRpcHead *pHead; SRpcHead *pHead;
int code = 0; int code = 0;
// set msg header // set msg header
memset(msg, 0, sizeof(SRpcHead)); memset(msg, 0, sizeof(SRpcHead));
pHead = (SRpcHead *)msg; pHead = (SRpcHead *)msg;
pHead->version = 1; pHead->version = 1;
pHead->msgVer = htonl(tsVersion >> 8); pHead->msgVer = htonl(tsVersion >> 8);
pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN; pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN;
pHead->spi = pConn->spi; pHead->spi = pConn->spi;
pHead->encrypt = 0; pHead->encrypt = 0;
pHead->tranId = (uint16_t)(taosRand() & 0xFFFF); // rand pHead->tranId = (uint16_t)(taosRand() & 0xFFFF); // rand
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->linkUid = pConn->linkUid; pHead->linkUid = pConn->linkUid;
pHead->ahandle = (uint64_t)pConn->ahandle; pHead->ahandle = (uint64_t)pConn->ahandle;
memcpy(pHead->user, pConn->user, tListLen(pHead->user)); memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = htonl(code); pHead->code = htonl(code);
...@@ -1846,10 +1863,10 @@ bool doRpcSendProbe(SRpcConn *pConn) { ...@@ -1846,10 +1863,10 @@ bool doRpcSendProbe(SRpcConn *pConn) {
} }
// send server syn // send server syn
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, bool *pReqOver) { bool rpcSendProbe(int64_t rpcRid, void *pPrevContext, bool *pReqOver) {
// return false can kill query // return false can kill query
bool ret = false; bool ret = false;
if(rpcRid < 0) { if (rpcRid < 0) {
tError("PROBE rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid); tError("PROBE rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid);
return true; return true;
} }
...@@ -1862,27 +1879,28 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, bool *pReqOver) { ...@@ -1862,27 +1879,28 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, bool *pReqOver) {
} }
// context same // context same
if(pContext != pPrevContext) { if (pContext != pPrevContext) {
tError("PROBE rpcRid=0x%" PRIx64 " context diff. pContext=%p pPreContent=%p", rpcRid, pContext, pPrevContext); tError("PROBE rpcRid=0x%" PRIx64 " context diff. pContext=%p pPreContent=%p", rpcRid, pContext, pPrevContext);
goto _END; goto _END;
} }
// conn same // conn same
if(pContext->pConn == NULL) { if (pContext->pConn == NULL) {
tInfo("PROBE rpcRid=0x%" PRIx64 " reqContext->pConn is NULL. The req is finished.", rpcRid); tInfo("PROBE rpcRid=0x%" PRIx64 " reqContext->pConn is NULL. The req is finished.", rpcRid);
if (pReqOver) if (pReqOver) *pReqOver = true;
*pReqOver = true;
ret = true; ret = true;
goto _END; goto _END;
} else if (pContext->pConn != pContext->sendInfo.pConn) { } else if (pContext->pConn != pContext->sendInfo.pConn) {
tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pContext->sendInfo.pConn); tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn,
pContext->sendInfo.pConn);
goto _END; goto _END;
} }
// fdObj same // fdObj same
if (pContext->pConn->chandle != pContext->sendInfo.pFdObj) { if (pContext->pConn->chandle != pContext->sendInfo.pFdObj) {
tInfo("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pContext->sendInfo.pFdObj); tInfo("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid,
pContext->pConn->chandle, pContext->sendInfo.pFdObj);
goto _END; goto _END;
} }
...@@ -1906,11 +1924,11 @@ _END: ...@@ -1906,11 +1924,11 @@ _END:
} }
// after sql request send , save conn info // after sql request send , save conn info
bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) { bool rpcSaveSendInfo(int64_t rpcRid, void **ppContext) {
if(rpcRid < 0) { if (rpcRid < 0) {
tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid); tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid);
return false; return false;
} }
// get req content // get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid); SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid);
if (pContext == NULL) { if (pContext == NULL) {
...@@ -1918,9 +1936,7 @@ bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) { ...@@ -1918,9 +1936,7 @@ bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) {
return false; return false;
} }
if (ppContext) if (ppContext) *ppContext = pContext;
*ppContext = pContext;
taosReleaseRef(tsRpcRefId, rpcRid); taosReleaseRef(tsRpcRefId, rpcRid);
return true; return true;
} }
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// clang-format off
#include "os.h" #include "os.h"
#include "tsocket.h" #include "tsocket.h"
#include "tutil.h" #include "tutil.h"
...@@ -22,22 +24,24 @@ ...@@ -22,22 +24,24 @@
#include "rpcHead.h" #include "rpcHead.h"
#include "rpcTcp.h" #include "rpcTcp.h"
// clang-format on
typedef struct SFdObj { typedef struct SFdObj {
void *signature; void *signature;
SOCKET fd; // TCP socket FD SOCKET fd; // TCP socket FD
void *thandle; // handle from upper layer, like TAOS void *thandle; // handle from upper layer, like TAOS
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int16_t closedByApp; // 1: already closed by App int16_t closedByApp; // 1: already closed by App
struct SThreadObj *pThreadObj; struct SThreadObj *pThreadObj;
struct SFdObj *prev; struct SFdObj *prev;
struct SFdObj *next; struct SFdObj *next;
uint64_t ctime; // create time uint64_t ctime; // create time
} SFdObj; } SFdObj;
typedef struct SThreadObj { typedef struct SThreadObj {
pthread_t thread; pthread_t thread;
SFdObj * pHead; SFdObj *pHead;
pthread_mutex_t mutex; pthread_mutex_t mutex;
uint32_t ip; uint32_t ip;
bool stop; bool stop;
...@@ -46,27 +50,27 @@ typedef struct SThreadObj { ...@@ -46,27 +50,27 @@ typedef struct SThreadObj {
int threadId; int threadId;
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
void *shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
void *(*processData)(SRecvInfo *pPacket); void *(*processData)(SRecvInfo *pPacket);
} SThreadObj; } SThreadObj;
typedef struct { typedef struct {
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
int32_t index; int32_t index;
int numOfThreads; int numOfThreads;
SThreadObj **pThreadObj; SThreadObj **pThreadObj;
} SClientObj; } SClientObj;
typedef struct { typedef struct {
SOCKET fd; SOCKET fd;
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int8_t stop; int8_t stop;
int8_t reserve; int8_t reserve;
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
int numOfThreads; int numOfThreads;
void * shandle; void *shandle;
SThreadObj **pThreadObj; SThreadObj **pThreadObj;
pthread_t thread; pthread_t thread;
} SServerObj; } SServerObj;
static void *taosProcessTcpData(void *param); static void *taosProcessTcpData(void *param);
...@@ -101,7 +105,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -101,7 +105,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
return NULL; return NULL;
} }
int code = 0; int code = 0;
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
...@@ -112,7 +116,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -112,7 +116,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
if (pThreadObj == NULL) { if (pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
for (int j=0; j<i; ++j) free(pServerObj->pThreadObj[j]); for (int j = 0; j < i; ++j) free(pServerObj->pThreadObj[j]);
free(pServerObj->pThreadObj); free(pServerObj->pThreadObj);
free(pServerObj); free(pServerObj);
return NULL; return NULL;
...@@ -174,8 +178,10 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -174,8 +178,10 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
return (void *)pServerObj; return (void *)pServerObj;
} }
static void taosStopTcpThread(SThreadObj* pThreadObj) { static void taosStopTcpThread(SThreadObj *pThreadObj) {
if (pThreadObj == NULL) { return;} if (pThreadObj == NULL) {
return;
}
// save thread into local variable and signal thread to stop // save thread into local variable and signal thread to stop
pthread_t thread = pThreadObj->thread; pthread_t thread = pThreadObj->thread;
if (!taosCheckPthreadValid(thread)) { if (!taosCheckPthreadValid(thread)) {
...@@ -199,7 +205,7 @@ void taosStopTcpServer(void *handle) { ...@@ -199,7 +205,7 @@ void taosStopTcpServer(void *handle) {
#ifdef WINDOWS #ifdef WINDOWS
closesocket(pServerObj->fd); closesocket(pServerObj->fd);
#elif defined(__APPLE__) #elif defined(__APPLE__)
if (pServerObj->fd!=-1) { if (pServerObj->fd != -1) {
close(pServerObj->fd); close(pServerObj->fd);
pServerObj->fd = -1; pServerObj->fd = -1;
} }
...@@ -264,8 +270,8 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -264,8 +270,8 @@ static void *taosAcceptTcpConnection(void *arg) {
} }
taosKeepTcpAlive(connFd); taosKeepTcpAlive(connFd);
struct timeval to={5, 0}; struct timeval to = {5, 0};
int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
if (ret != 0) { if (ret != 0) {
taosCloseSocket(connFd); taosCloseSocket(connFd);
tError("%s failed to set recv timeout fd(%s)for connection from:%s:%hu", pServerObj->label, strerror(errno), tError("%s failed to set recv timeout fd(%s)for connection from:%s:%hu", pServerObj->label, strerror(errno),
...@@ -273,7 +279,6 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -273,7 +279,6 @@ static void *taosAcceptTcpConnection(void *arg) {
continue; continue;
} }
// pick up the thread to handle this connection // pick up the thread to handle this connection
pThreadObj = pServerObj->pThreadObj[threadId]; pThreadObj = pServerObj->pThreadObj[threadId];
...@@ -283,7 +288,7 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -283,7 +288,7 @@ static void *taosAcceptTcpConnection(void *arg) {
pFdObj->port = htons(caddr.sin_port); pFdObj->port = htons(caddr.sin_port);
pFdObj->ctime = taosGetTimestampMs(); pFdObj->ctime = taosGetTimestampMs();
tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
} else { } else {
taosCloseSocket(connFd); taosCloseSocket(connFd);
tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
...@@ -309,14 +314,14 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -309,14 +314,14 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread
tstrncpy(pClientObj->label, label, sizeof(pClientObj->label)); tstrncpy(pClientObj->label, label, sizeof(pClientObj->label));
pClientObj->numOfThreads = numOfThreads; pClientObj->numOfThreads = numOfThreads;
pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj*)); pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj *));
if (pClientObj->pThreadObj == NULL) { if (pClientObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
tfree(pClientObj); tfree(pClientObj);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
} }
int code = 0; int code = 0;
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
...@@ -326,15 +331,15 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -326,15 +331,15 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread
if (pThreadObj == NULL) { if (pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
for (int j=0; j<i; ++j) free(pClientObj->pThreadObj[j]); for (int j = 0; j < i; ++j) free(pClientObj->pThreadObj[j]);
free(pClientObj); free(pClientObj);
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
return NULL; return NULL;
} }
pClientObj->pThreadObj[i] = pThreadObj; pClientObj->pThreadObj[i] = pThreadObj;
taosResetPthread(&pThreadObj->thread); taosResetPthread(&pThreadObj->thread);
pThreadObj->ip = ip; pThreadObj->ip = ip;
pThreadObj->stop = false; pThreadObj->stop = false;
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
pThreadObj->shandle = shandle; pThreadObj->shandle = shandle;
pThreadObj->processData = fp; pThreadObj->processData = fp;
...@@ -376,14 +381,14 @@ void taosStopTcpClient(void *chandle) { ...@@ -376,14 +381,14 @@ void taosStopTcpClient(void *chandle) {
if (pClientObj == NULL) return; if (pClientObj == NULL) return;
tDebug ("%s TCP client is stopped", pClientObj->label); tDebug("%s TCP client is stopped", pClientObj->label);
} }
void taosCleanUpTcpClient(void *chandle) { void taosCleanUpTcpClient(void *chandle) {
SClientObj *pClientObj = chandle; SClientObj *pClientObj = chandle;
if (pClientObj == NULL) return; if (pClientObj == NULL) return;
for (int i = 0; i < pClientObj->numOfThreads; ++i) { for (int i = 0; i < pClientObj->numOfThreads; ++i) {
SThreadObj *pThreadObj= pClientObj->pThreadObj[i]; SThreadObj *pThreadObj = pClientObj->pThreadObj[i];
taosStopTcpThread(pThreadObj); taosStopTcpThread(pThreadObj);
} }
...@@ -393,9 +398,9 @@ void taosCleanUpTcpClient(void *chandle) { ...@@ -393,9 +398,9 @@ void taosCleanUpTcpClient(void *chandle) {
} }
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
SClientObj * pClientObj = shandle; SClientObj *pClientObj = shandle;
int32_t idx = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads; int32_t idx = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads;
atomic_store_32(&pClientObj->index, idx + 1); atomic_store_32(&pClientObj->index, idx + 1);
SThreadObj *pThreadObj = pClientObj->pThreadObj[idx]; SThreadObj *pThreadObj = pClientObj->pThreadObj[idx];
SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
...@@ -406,10 +411,10 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin ...@@ -406,10 +411,10 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
#endif #endif
struct sockaddr_in sockin; struct sockaddr_in sockin;
uint16_t localPort = 0; uint16_t localPort = 0;
unsigned int addrlen = sizeof(sockin); unsigned int addrlen = sizeof(sockin);
if (getsockname(fd, (struct sockaddr *)&sockin, &addrlen) == 0 && if (getsockname(fd, (struct sockaddr *)&sockin, &addrlen) == 0 && sockin.sin_family == AF_INET &&
sockin.sin_family == AF_INET && addrlen == sizeof(sockin)) { addrlen == sizeof(sockin)) {
localPort = (uint16_t)ntohs(sockin.sin_port); localPort = (uint16_t)ntohs(sockin.sin_port);
} }
...@@ -420,8 +425,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin ...@@ -420,8 +425,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
pFdObj->port = port; pFdObj->port = port;
pFdObj->ip = ip; pFdObj->ip = ip;
pFdObj->ctime = taosGetTimestampMs(); pFdObj->ctime = taosGetTimestampMs();
tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", pThreadObj->label, thandle,
pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds); ip, port, localPort, pFdObj, pThreadObj->numOfFds);
} else { } else {
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
taosCloseSocket(fd); taosCloseSocket(fd);
...@@ -435,7 +440,8 @@ void taosCloseTcpConnection(void *chandle) { ...@@ -435,7 +440,8 @@ void taosCloseTcpConnection(void *chandle) {
if (pFdObj == NULL || pFdObj->signature != pFdObj) return; if (pFdObj == NULL || pFdObj->signature != pFdObj) return;
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
tDebug("DEEP %s shutdown3 fd=%d ip=0x%x port=%d ctime=%" PRId64, pThreadObj->label, pFdObj->fd, pFdObj->ip, pFdObj->port, pFdObj->ctime); tDebug("DEEP %s shutdown3 fd=%d ip=0x%x port=%d ctime=%" PRId64, pThreadObj->label, pFdObj->fd, pFdObj->ip,
pFdObj->port, pFdObj->ctime);
// pFdObj->thandle = NULL; // pFdObj->thandle = NULL;
pFdObj->closedByApp = 1; pFdObj->closedByApp = 1;
...@@ -448,8 +454,9 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand ...@@ -448,8 +454,9 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand
tError("DEEP TCP send data failed(chandle null). data=0x%p len=%d ip=0x%0x port=%d", data, len, ip, port); tError("DEEP TCP send data failed(chandle null). data=0x%p len=%d ip=0x%0x port=%d", data, len, ip, port);
return -1; return -1;
} }
if(pFdObj->signature != pFdObj) { if (pFdObj->signature != pFdObj) {
tError("DEEP TCP send data failed(sig diff). pFdObj=0x%p sig=0x%p data=%p len=%d ip=0x%x port=%d", pFdObj, pFdObj->signature, data, len, ip, port); tError("DEEP TCP send data failed(sig diff). pFdObj=0x%p sig=0x%p data=%p len=%d ip=0x%x port=%d", pFdObj,
pFdObj->signature, data, len, ip, port);
return -2; return -2;
} }
...@@ -457,27 +464,31 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand ...@@ -457,27 +464,31 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand
int ret = taosWriteMsg(pFdObj->fd, data, len); int ret = taosWriteMsg(pFdObj->fd, data, len);
tTrace("%s %p TCP data is sent, FD:%p fd:%d bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, ret); tTrace("%s %p TCP data is sent, FD:%p fd:%d bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, ret);
if(ret < 0) { if (ret < 0) {
tError("DEEP %s %p TCP data sent failed and try again, FD:%p fd:%d ctime=%" PRId64 " ret=%d le=%d ip=0x%x port=%d threadid=%d numofFds=%d", tError("DEEP %s %p TCP data sent failed and try again, FD:%p fd:%d ctime=%" PRId64
pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pFdObj->ctime, ret, len, ip, port, pThreadObj->threadId, pThreadObj->numOfFds); " ret=%d le=%d ip=0x%x port=%d threadid=%d numofFds=%d",
pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pFdObj->ctime, ret, len, ip, port,
pThreadObj->threadId, pThreadObj->numOfFds);
ret = taosWriteMsg(pFdObj->fd, data, len); ret = taosWriteMsg(pFdObj->fd, data, len);
if(ret < 0) { if (ret < 0) {
tError("DEEP %s %p Second TCP data sent failed, FD:%p fd:%d ctime=%" PRId64 " ret=%d le=%d ip=0x%x port=%d threadid=%d numofFds=%d", tError("DEEP %s %p Second TCP data sent failed, FD:%p fd:%d ctime=%" PRId64
pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pFdObj->ctime, ret, len, ip, port, pThreadObj->threadId, pThreadObj->numOfFds); " ret=%d le=%d ip=0x%x port=%d threadid=%d numofFds=%d",
} pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pFdObj->ctime, ret, len, ip, port,
pThreadObj->threadId, pThreadObj->numOfFds);
}
} }
return ret; return ret;
} }
static void taosReportBrokenLink(SFdObj *pFdObj) { static void taosReportBrokenLink(SFdObj *pFdObj) {
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
// notify the upper layer, so it will clean the associated context // notify the upper layer, so it will clean the associated context
if (pFdObj->closedByApp == 0) { if (pFdObj->closedByApp == 0) {
shutdown(pFdObj->fd, SHUT_WR); shutdown(pFdObj->fd, SHUT_WR);
tDebug("DEEP %s shutdown2 fd=%d ip=0x%x port=%d ctime=%" PRId64, pThreadObj->label, pFdObj->fd, pFdObj->ip, pFdObj->port, pFdObj->ctime); tDebug("DEEP %s shutdown2 fd=%d ip=0x%x port=%d ctime=%" PRId64, pThreadObj->label, pFdObj->fd, pFdObj->ip,
pFdObj->port, pFdObj->ctime);
SRecvInfo recvInfo; SRecvInfo recvInfo;
recvInfo.msg = NULL; recvInfo.msg = NULL;
...@@ -495,9 +506,9 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { ...@@ -495,9 +506,9 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
} }
static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
SRpcHead rpcHead; SRpcHead rpcHead;
int32_t msgLen, leftLen, retLen, headLen; int32_t msgLen, leftLen, retLen, headLen;
char *buffer, *msg; char *buffer, *msg;
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
...@@ -520,7 +531,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { ...@@ -520,7 +531,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1; return -1;
} else { } else {
tTrace("%s %p read data, FD:%p fd:%d TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, buffer); tTrace("%s %p read data, FD:%p fd:%d TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd,
buffer);
} }
msg = buffer + tsRpcOverhead; msg = buffer + tsRpcOverhead;
...@@ -528,8 +540,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { ...@@ -528,8 +540,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) { if (leftLen != retLen) {
tError("%s %p read error, leftLen:%d retLen:%d FD:%p", tError("%s %p read error, leftLen:%d retLen:%d FD:%p", pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj);
pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj);
free(buffer); free(buffer);
return -1; return -1;
} }
...@@ -595,7 +606,8 @@ static void *taosProcessTcpData(void *param) { ...@@ -595,7 +606,8 @@ static void *taosProcessTcpData(void *param) {
} }
if (taosReadTcpData(pFdObj, &recvInfo) < 0) { if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
tDebug("DEEP %s shutdown1 fd=%d ip=0x%x port=%d ctime=%" PRId64, pThreadObj->label, pFdObj->fd, pFdObj->ip, pFdObj->port, pFdObj->ctime); tDebug("DEEP %s shutdown1 fd=%d ip=0x%x port=%d ctime=%" PRId64, pThreadObj->label, pFdObj->fd, pFdObj->ip,
pFdObj->port, pFdObj->ctime);
shutdown(pFdObj->fd, SHUT_WR); shutdown(pFdObj->fd, SHUT_WR);
continue; continue;
} }
...@@ -607,7 +619,7 @@ static void *taosProcessTcpData(void *param) { ...@@ -607,7 +619,7 @@ static void *taosProcessTcpData(void *param) {
if (pThreadObj->stop) break; if (pThreadObj->stop) break;
} }
if (pThreadObj->pollFd >=0) { if (pThreadObj->pollFd >= 0) {
EpollClose(pThreadObj->pollFd); EpollClose(pThreadObj->pollFd);
pThreadObj->pollFd = -1; pThreadObj->pollFd = -1;
} }
...@@ -658,7 +670,6 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) { ...@@ -658,7 +670,6 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
} }
static void taosFreeFdObj(SFdObj *pFdObj) { static void taosFreeFdObj(SFdObj *pFdObj) {
if (pFdObj == NULL) return; if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return; if (pFdObj->signature != pFdObj) return;
...@@ -672,15 +683,14 @@ static void taosFreeFdObj(SFdObj *pFdObj) { ...@@ -672,15 +683,14 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pFdObj->signature = NULL; pFdObj->signature = NULL;
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
tDebug("DEEP %s close1 fd=%d pFdObj=%p ip=0x%x port=%d ctime=%" PRId64 " thandle=%p", tDebug("DEEP %s close1 fd=%d pFdObj=%p ip=0x%x port=%d ctime=%" PRId64 " thandle=%p", pThreadObj->label, pFdObj->fd,
pThreadObj->label, pFdObj->fd, pFdObj, pFdObj->ip, pFdObj->port, pFdObj->ctime, pFdObj->thandle); pFdObj, pFdObj->ip, pFdObj->port, pFdObj->ctime, pFdObj->thandle);
taosCloseSocket(pFdObj->fd); taosCloseSocket(pFdObj->fd);
pThreadObj->numOfFds--; pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0) if (pThreadObj->numOfFds < 0)
tError("%s %p TCP thread:%d, number of FDs is negative!!!", tError("%s %p TCP thread:%d, number of FDs is negative!!!", pThreadObj->label, pFdObj->thandle,
pThreadObj->label, pFdObj->thandle, pThreadObj->threadId); pThreadObj->threadId);
if (pFdObj->prev) { if (pFdObj->prev) {
(pFdObj->prev)->next = pFdObj->next; (pFdObj->prev)->next = pFdObj->next;
...@@ -694,17 +704,15 @@ static void taosFreeFdObj(SFdObj *pFdObj) { ...@@ -694,17 +704,15 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pthread_mutex_unlock(&pThreadObj->mutex); pthread_mutex_unlock(&pThreadObj->mutex);
tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj,
pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pThreadObj->numOfFds); pFdObj->fd, pThreadObj->numOfFds);
tfree(pFdObj); tfree(pFdObj);
} }
SOCKET taosGetFdID(void *chandle) { SOCKET taosGetFdID(void *chandle) {
SFdObj * pFdObj = chandle; SFdObj *pFdObj = chandle;
if(pFdObj == NULL) if (pFdObj == NULL) return -1;
return -1; if (pFdObj->signature != pFdObj) return -1;
if (pFdObj->signature != pFdObj)
return -1;
return pFdObj->fd; return pFdObj->fd;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册