提交 fc22eb44 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

rearrange the code to make it more readable

上级 37b55ef1
......@@ -59,7 +59,8 @@ typedef struct {
int16_t index;
int16_t numOfIps;
uint16_t port;
char ipStr[TSDB_MAX_MPEERS][40];
uint32_t ip[TSDB_MAX_MPEERS];
char ipStr[TSDB_MAX_MPEERS][TSDB_IPv4ADDR_LEN];
} SRpcIpSet;
void *rpcOpen(SRpcInit *pRpc);
......
......@@ -13,23 +13,20 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCCACHE_H
#define TDENGINE_TSCCACHE_H
#ifndef TDENGINE_CONN_CACHE_H
#define TDENGINE_CONN_CACHE_H
#ifdef __cplusplus
extern "C" {
#endif
void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer);
void taosCloseConnCache(void *handle);
void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user);
void taosCloseConnCache(void *handle);
void taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user);
void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCACHE_H
#endif // TDENGINE_CONN_CACHE_H
......@@ -21,7 +21,7 @@
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "tcache.h"
#include "tconncache.h"
typedef struct _c_hash_t {
uint32_t ip;
......@@ -64,7 +64,6 @@ int taosHashConn(void *handle, uint32_t ip, uint16_t port, char *user) {
}
void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64_t time) {
if (pNode == NULL) return;
if (time < pObj->keepTimer + pNode->time) return;
SConnHash *pPrev = pNode->prev, *pNext;
......@@ -86,7 +85,7 @@ void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64
pObj->connHashList[hash] = NULL;
}
void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) {
void taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) {
int hash;
SConnHash * pNode;
SConnCache *pObj;
......@@ -94,12 +93,8 @@ void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port,
uint64_t time = taosGetTimestampMs();
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL;
if (data == NULL) {
tscTrace("data:%p ip:%p:%d not valid, not added in cache", data, ip, port);
return NULL;
}
assert(pObj);
assert(data);
hash = taosHashConn(pObj, ip, port, user);
pNode = (SConnHash *)taosMemPoolMalloc(pObj->connHashMemPool);
......@@ -123,7 +118,7 @@ void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port,
tscTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pObj->count[hash]);
return pObj;
return;
}
void taosCleanConnCache(void *handle, void *tmrId) {
......@@ -155,7 +150,7 @@ void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user)
void * pData = NULL;
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL;
assert(pObj);
uint64_t time = taosGetTimestampMs();
......
......@@ -14,14 +14,12 @@
*/
#include "os.h"
#include "tcache.h"
#include "shash.h"
#include "taosmsg.h"
#include "tidpool.h"
#include "tlog.h"
#include "tmd5.h"
#include "tmempool.h"
#include "trpc.h"
#include "tsocket.h"
#include "ttcpclient.h"
#include "ttcpserver.h"
......@@ -30,6 +28,8 @@
#include "tudp.h"
#include "tutil.h"
#include "lz4.h"
#include "tconncache.h"
#include "trpc.h"
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest))
#define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader)))
......@@ -41,9 +41,8 @@
typedef struct {
int sessions;
int numOfThreads;
int type;
int idleTime; // milliseconds;
uint32_t localIp;
char localIp[TSDB_IPv4ADDR_LEN];
uint16_t localPort;
int connType;
char label[12];
......@@ -78,7 +77,6 @@ typedef struct {
} SRpcReqContext;
typedef struct _RpcConn {
void *signature;
int sid; // session ID
uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID
......@@ -177,163 +175,27 @@ void (*taosCloseConn[])(void *chandle) = {
taosCloseTcpClientConnection
};
static void rpcProcessRetryTimer(void *, void *);
static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle);
static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen);
static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
static int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort);
static void rpcCloseConn(void *thandle);
static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr);
static void rpcProcessConnError(void *param, void *id);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader);
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
SRpcHeader *pHeader = rpcHeaderFromCont(pCont);
int32_t overhead = sizeof(int32_t) * 2;
int32_t finalLen = 0;
if (!NEEDTO_COMPRESSS_MSG(contLen)) {
return contLen;
}
char *buf = malloc (contLen + overhead+8); // 16 extra bytes
if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno));
return contLen;
}
int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
/*
* only the compressed size is less than the value of contLen - overhead, the compression is applied
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/
if (compLen < contLen - overhead) {
//tDump(pCont, contLen);
int32_t *pLen = (int32_t *)pCont;
*pLen = 0; // first 4 bytes must be zero
pLen = (int32_t *)(pCont + sizeof(int32_t));
*pLen = htonl(contLen); // contLen is encoded in second 4 bytes
memcpy(pCont + overhead, buf, compLen);
pHeader->comp = 1;
tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen);
finalLen = compLen + overhead;
//tDump(pCont, contLen);
} else {
finalLen = contLen;
}
free(buf);
return finalLen;
}
static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) {
int overhead = sizeof(int32_t) * 2;
SRpcHeader *pNewHeader = NULL;
uint8_t *pCont = pHeader->content;
if (pHeader->comp) {
// decompress the content
assert(GET_INT32_VAL(pHeader->content) == 0);
// contLen is original message length before compression applied
int contLen = htonl(GET_INT32_VAL(pCont + sizeof(int32_t)));
// prepare the temporary buffer to decompress message
char *buf = rpcMallocCont(contLen);
if (buf) {
pNewHeader = rpcHeaderFromCont(buf);
int compLen = rpcContLenFromMsg(pHeader->msgLen) - overhead;
int32_t originalLen = LZ4_decompress_safe((const char*)(pCont + overhead), buf, compLen, contLen);
assert(originalLen == contLen);
memcpy(pNewHeader, pHeader, sizeof(SRpcHeader));
pNewHeader->msgLen = rpcMsgLenFromCont(originalLen);
free(pHeader); // free the compressed message buffer
pHeader = pNewHeader;
} else {
tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno));
}
}
return pHeader;
}
void *rpcMallocCont(int size) {
char *pMsg = NULL;
size += RPC_MSG_OVERHEAD;
pMsg = (char *)calloc(1, (size_t)size);
if (pMsg == NULL) {
tError("failed to malloc msg, size:%d", size);
return NULL;
}
return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHeader);
}
void rpcFreeCont(void *cont) {
char *msg = ((char *)cont) - sizeof(SRpcHeader);
free(msg);
}
static void rpcFreeMsg(void *msg) {
char *req = ((char *)msg) - sizeof(SRpcReqContext);
free(req);
}
void rpcSendSimpleRsp(void *thandle, int32_t code) {
char *pMsg;
STaosRsp *pRsp;
int msgLen = sizeof(STaosRsp);
if (thandle == NULL) {
tError("connection is gone, response could not be sent");
return;
}
pMsg = rpcMallocCont(msgLen);
if (pMsg == NULL) return;
pRsp = (STaosRsp *)pMsg;
pRsp->code = htonl(code);
rpcSendResponse(thandle, pMsg, msgLen);
return;
}
static SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet);
static int rpcGetConnObj(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr);
static void rpcSendQuickRsp(SRpcConn *pConn, char code) {
char msg[RPC_MSG_OVERHEAD + sizeof(STaosRsp)];
SRpcHeader *pHeader;
int msgLen;
STaosRsp *pRsp;
pRsp = (STaosRsp *)rpcContFromHeader(msg);
pRsp->code = htonl(code);
msgLen = sizeof(STaosRsp);
static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, char code);
static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle);
static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen);
// set msg header
memset(msg, 0, sizeof(SRpcHeader));
pHeader = (SRpcHeader *)msg;
pHeader->version = 1;
pHeader->msgType = pConn->inType+1;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pHeader->tranId = pConn->inTranId;
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pHeader->uid = 0;
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader);
static void rpcProcessConnError(void *param, void *id);
static void rpcProcessRetryTimer(void *, void *);
static void rpcProcessIdleTimer(void *param, void *tmrId);
rpcSendDataToPeer(pConn, msg, msgLen);
}
static void rpcFreeMsg(void *msg);
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader);
static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
void *rpcOpen(SRpcInit *pInit) {
SRpcInfo *pRpc;
......@@ -346,13 +208,14 @@ void *rpcOpen(SRpcInit *pInit) {
strcpy(pRpc->label, pInit->label);
pRpc->fp = pInit->fp;
pRpc->type = pInit->connType;
pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime;
pRpc->numOfThreads = pInit->numOfThreads;
if (pRpc->numOfThreads > TSDB_MAX_RPC_THREADS) {
pRpc->numOfThreads = TSDB_MAX_RPC_THREADS;
}
strcpy(pRpc->localIp, pInit->localIp);
pRpc->localPort = pInit->localPort;
pRpc->afp = pInit->afp;
pRpc->sessions = pInit->sessions;
......@@ -416,7 +279,7 @@ void *rpcOpen(SRpcInit *pInit) {
void rpcClose(void *param) {
SRpcInfo *pRpc = (SRpcInfo *)param;
(*taosCleanUpConn[pRpc->type])(pRpc->shandle);
(*taosCleanUpConn[pRpc->connType])(pRpc->shandle);
for (int i = 0; i < pRpc->sessions; ++i) {
if (pRpc->connList[i].meterId[0]) {
......@@ -434,10 +297,111 @@ void rpcClose(void *param) {
tfree(pRpc);
}
void *rpcMallocCont(int size) {
char *pMsg = NULL;
size += RPC_MSG_OVERHEAD;
pMsg = (char *)calloc(1, (size_t)size);
if (pMsg == NULL) {
tError("failed to malloc msg, size:%d", size);
return NULL;
}
return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHeader);
}
void rpcFreeCont(void *cont) {
char *msg = ((char *)cont) - sizeof(SRpcHeader);
free(msg);
}
void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int contLen, void *ahandle) {
SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcConn *pConn;
SRpcReqContext *pContext;
contLen = rpcCompressRpcMsg(pCont, contLen);
pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHeader)-sizeof(SRpcReqContext));
pContext->ahandle = ahandle;
pContext->pRpc = (SRpcInfo *)shandle;
pContext->ipSet = ipSet;
pContext->contLen = contLen;
pContext->pCont = pCont;
pContext->msgType = type;
pConn = rpcSetConnToServer(shandle, ipSet);
pContext->code = terrno;
if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
rpcSendReqToServer(pConn, pContext);
return;
}
void rpcSendResponse(void *handle, void *pCont, int contLen) {
int msgLen = 0;
SRpcConn *pConn = (SRpcConn *)handle;
SRpcInfo *pRpc = pConn->pRpc;
SRpcHeader *pHeader = rpcHeaderFromCont(pCont);
char *msg = (char *)pHeader;
contLen = rpcCompressRpcMsg(pCont, contLen);
msgLen = rpcMsgLenFromCont(contLen);
pthread_mutex_lock(&pRpc->mutex);
// set msg header
pHeader->version = 1;
pHeader->msgType = pConn->inType+1;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pHeader->tranId = pConn->inTranId;
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pHeader->uid = 0;
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
// set pConn parameters
pConn->inType = 0;
rpcFreeMsg(pConn->pRspMsg);
pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen;
if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
pthread_mutex_unlock(&pRpc->mutex);
rpcSendDataToPeer(pConn, msg, msgLen);
return;
}
void rpcSendSimpleRsp(void *thandle, int32_t code) {
char *pMsg;
STaosRsp *pRsp;
int msgLen = sizeof(STaosRsp);
if (thandle == NULL) {
tError("connection is gone, response could not be sent");
return;
}
pMsg = rpcMallocCont(msgLen);
if (pMsg == NULL) return;
pRsp = (STaosRsp *)pMsg;
pRsp->code = htonl(code);
rpcSendResponse(thandle, pMsg, msgLen);
return;
}
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) {
SRpcConn *pConn;
if ( (uint8_t)(rpcGetConn(0, pRpc->meterId, pRpc, &pConn, 1, NULL)) != 0 )
if ( (uint8_t)(rpcGetConnObj(0, pRpc->meterId, pRpc, &pConn, 1, NULL)) != 0 )
return NULL;
strcpy(pConn->peerIpstr, peerIpStr);
......@@ -449,8 +413,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort)
strcpy(pConn->meterId, pRpc->meterId);
// if it is client, it shall set up connection first
if (taosOpenConn[pRpc->type]) {
pConn->chandle = (*taosOpenConn[pRpc->type])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (taosOpenConn[pRpc->connType]) {
pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (pConn->chandle) {
tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label,
pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort);
......@@ -475,7 +439,7 @@ static void rpcCloseConn(void *thandle) {
pthread_mutex_lock(&pRpc->mutex);
if (taosCloseConn[pRpc->type]) (*taosCloseConn[pRpc->type])(pConn->chandle);
if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle);
taosTmrStopA(&pConn->pTimer);
taosTmrStopA(&pConn->pIdleTimer);
......@@ -494,7 +458,7 @@ static void rpcCloseConn(void *thandle) {
pthread_mutex_unlock(&pRpc->mutex);
}
static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr) {
static int rpcGetConnObj(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr) {
SRpcConn * pConn = NULL;
if (sid == 0) {
......@@ -558,41 +522,16 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn,
return TSDB_CODE_SUCCESS;
}
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHeader *pHeader = (SRpcHeader *)msg;
SRpcInfo *pRpc = pConn->pRpc;
int code = 0;
SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet) {
SRpcInfo *pRpc = (SRpcInfo *)shandle;
if (pConn->spi == 0 ) return 0;
SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId);
if (pHeader->spi == pConn->spi) {
// authentication
SRpcDigest *pDigest = (SRpcDigest *)((char *)pHeader + msgLen - sizeof(SRpcDigest));
int32_t delta;
delta = (int32_t)htonl(pDigest->timeStamp);
delta -= (int32_t)taosGetTimestampSec();
if (abs(delta) > 900) {
tWarn("%s pConn:%p, time diff:%d is too big, msg discarded, timestamp:%d", pRpc->label, pConn,
delta, htonl(pDigest->timeStamp));
code = TSDB_CODE_INVALID_TIME_STAMP;
} else {
if (rpcAuthenticateMsg((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE;
} else {
pHeader->msgLen -= sizeof(SRpcDigest);
}
}
} else {
// if it is request or response with code 0, msg shall be discarded
if (rpcIsReq(pHeader->msgType) || (pHeader->content[0] == 0)) {
tTrace("%s pConn:%p, auth spi not matched, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE;
}
}
if ( pConn == NULL ) {
pConn = rpcOpenConn(pRpc, ipSet.ipStr[ipSet.index], ipSet.port);
}
return code;
return pConn;
}
static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
......@@ -611,7 +550,7 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
if (pConn->inTranId == pHeader->tranId) {
if (pConn->inType == pHeader->msgType) {
tTrace("%s pConn:%p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHeader->msgType]);
taosSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
} else if (pConn->inType == 0) {
tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn,
taosMsg[pHeader->msgType], pConn->inTranId);
......@@ -675,6 +614,8 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
pConn->outType = 0;
pConn->pReqMsg = NULL;
pConn->reqMsgLen = 0;
return TSDB_CODE_SUCCESS;
}
static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) {
......@@ -707,7 +648,7 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d
if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHeader->uid, pHeader->sourceId);
code = rpcGetConn(sid, pHeader->meterId, pRpc, &pConn, rpcIsReq(pHeader->msgType), hashstr);
code = rpcGetConnObj(sid, pHeader->meterId, pRpc, &pConn, rpcIsReq(pHeader->msgType), hashstr);
if (code != TSDB_CODE_SUCCESS) return code;
*ppConn = pConn;
......@@ -737,62 +678,6 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d
return code;
}
void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) {
SRpcHeader *pRecvHeader, *pReplyHeader;
char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(STaosRsp)];
STaosRsp *pRsp;
uint32_t timeStamp;
int msgLen;
pRecvHeader = (SRpcHeader *)pMsg;
pReplyHeader = (SRpcHeader *)msg;
memset(msg, 0, sizeof(SRpcHeader));
pReplyHeader->version = pRecvHeader->version;
pReplyHeader->msgType = (char)(pRecvHeader->msgType + 1);
pReplyHeader->tcp = 0;
pReplyHeader->spi = 0;
pReplyHeader->encrypt = 0;
pReplyHeader->tranId = pRecvHeader->tranId;
pReplyHeader->sourceId = 0;
pReplyHeader->destId = pRecvHeader->sourceId;
memcpy(pReplyHeader->meterId, pRecvHeader->meterId, tListLen(pReplyHeader->meterId));
pRsp = (STaosRsp *)pReplyHeader->content;
pRsp->code = htonl(code);
msgLen = sizeof(STaosRsp);
char *pContent = pRsp->more;
if (code == TSDB_CODE_INVALID_TIME_STAMP) {
// include a time stamp if client's time is not synchronized well
timeStamp = taosGetTimestampSec();
memcpy(pContent, &timeStamp, sizeof(timeStamp));
msgLen += sizeof(timeStamp);
}
pReplyHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
(*taosSendData[pRpc->type])(ip, port, msg, msgLen, chandle);
return;
}
void rpcProcessIdleTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param;
if (pConn->signature != param) {
tError("idle timer pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param);
return;
}
SRpcInfo * pRpc = pConn->pRpc;
if (pConn->pIdleTimer != tmrId) {
tTrace("%s pConn:%p, idle timer:%p already processed", pRpc->label, pConn, tmrId);
return;
}
tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn);
rpcCloseConn(pConn);
}
static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) {
SRpcHeader *pHeader = (SRpcHeader *)data;
SRpcInfo *pRpc = (SRpcInfo *)shandle;
......@@ -863,7 +748,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) {
SRpcReqContext *pContext = pConn->pContext;
pConn->pContext = NULL;
taosAddConnToIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId);
taosAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId);
if (code == TSDB_CODE_NO_MASTER) {
pContext->code = code;
......@@ -875,70 +760,73 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) {
}
}
SRpcConn *rpcGetConnToServer(void *shandle, SRpcIpSet ipSet) {
SRpcInfo *pRpc = (SRpcInfo *)shandle;
static void rpcSendQuickRsp(SRpcConn *pConn, char code) {
char msg[RPC_MSG_OVERHEAD + sizeof(STaosRsp)];
SRpcHeader *pHeader;
int msgLen;
STaosRsp *pRsp;
SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ipStr[ipSet.index], ipSet.port, pRpc->meterId);
pRsp = (STaosRsp *)rpcContFromHeader(msg);
pRsp->code = htonl(code);
msgLen = sizeof(STaosRsp);
if ( pConn == NULL ) {
pConn = rpcOpenConn(pRpc, ipSet.ipStr[ipSet.index], ipSet.port);
}
// set msg header
memset(msg, 0, sizeof(SRpcHeader));
pHeader = (SRpcHeader *)msg;
pHeader->version = 1;
pHeader->msgType = pConn->inType+1;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pHeader->tranId = pConn->inTranId;
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pHeader->uid = 0;
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
return pConn;
rpcSendDataToPeer(pConn, msg, msgLen);
}
int taosAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHeader *pHeader = (SRpcHeader *)msg;
if (pConn->spi) {
// add auth part
pHeader->spi = pConn->spi;
SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
pDigest->timeStamp = htonl(taosGetTimestampSec());
msgLen += sizeof(SRpcDigest);
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
rpcBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
} else {
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
return msgLen;
}
static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) {
SRpcHeader *pRecvHeader, *pReplyHeader;
char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(STaosRsp)];
STaosRsp *pRsp;
uint32_t timeStamp;
int msgLen;
static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen) {
int writtenLen = 0;
SRpcInfo *pRpc = pConn->pRpc;
SRpcHeader *pHeader = (SRpcHeader *)data;
pRecvHeader = (SRpcHeader *)pMsg;
pReplyHeader = (SRpcHeader *)msg;
assert(data);
assert(dataLen>0);
assert(pHeader->msgType > 0);
memset(msg, 0, sizeof(SRpcHeader));
pReplyHeader->version = pRecvHeader->version;
pReplyHeader->msgType = (char)(pRecvHeader->msgType + 1);
pReplyHeader->tcp = 0;
pReplyHeader->spi = 0;
pReplyHeader->encrypt = 0;
pReplyHeader->tranId = pRecvHeader->tranId;
pReplyHeader->sourceId = 0;
pReplyHeader->destId = pRecvHeader->sourceId;
memcpy(pReplyHeader->meterId, pRecvHeader->meterId, tListLen(pReplyHeader->meterId));
dataLen = taosAddAuthPart(pConn, data, dataLen);
pRsp = (STaosRsp *)pReplyHeader->content;
pRsp->code = htonl(code);
msgLen = sizeof(STaosRsp);
char *pContent = pRsp->more;
if ( rpcIsReq(pHeader->msgType)) {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr,
pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
} else {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pConn->peerPort,
(uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
if (code == TSDB_CODE_INVALID_TIME_STAMP) {
// include a time stamp if client's time is not synchronized well
timeStamp = taosGetTimestampSec();
memcpy(pContent, &timeStamp, sizeof(timeStamp));
msgLen += sizeof(timeStamp);
}
writtenLen = (*taosSendData[pRpc->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle);
pReplyHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
(*taosSendData[pRpc->connType])(ip, port, msg, msgLen, chandle);
if (writtenLen != dataLen) {
tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn,
dataLen, writtenLen, strerror(errno));
}
tDump(data, dataLen);
return;
}
void rpcSendReqToOneServer(SRpcConn *pConn, SRpcReqContext *pContext) {
static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext) {
SRpcHeader *pHeader = rpcHeaderFromCont(pContext->pCont);
SRpcInfo *pRpc = pConn->pRpc;
char *msg = (char *)pHeader;
......@@ -970,66 +858,37 @@ void rpcSendReqToOneServer(SRpcConn *pConn, SRpcReqContext *pContext) {
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
}
void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int contLen, void *ahandle) {
SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcConn *pConn;
SRpcReqContext *pContext;
contLen = rpcCompressRpcMsg(pCont, contLen);
pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHeader)-sizeof(SRpcReqContext));
pContext->ahandle = ahandle;
pContext->pRpc = (SRpcInfo *)shandle;
pContext->ipSet = ipSet;
pContext->contLen = contLen;
pContext->pCont = pCont;
pContext->msgType = type;
pConn = rpcGetConnToServer(shandle, ipSet);
pContext->code = terrno;
if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
rpcSendReqToOneServer(pConn, pContext);
return;
}
void rpcSendResponse(void *handle, void *pCont, int contLen) {
int msgLen = 0;
SRpcConn *pConn = (SRpcConn *)handle;
static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen) {
int writtenLen = 0;
SRpcInfo *pRpc = pConn->pRpc;
SRpcHeader *pHeader = rpcHeaderFromCont(pCont);
char *msg = (char *)pHeader;
contLen = rpcCompressRpcMsg(pCont, contLen);
msgLen = rpcMsgLenFromCont(contLen);
pthread_mutex_lock(&pRpc->mutex);
// set msg header
pHeader->version = 1;
pHeader->msgType = pConn->inType+1;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pHeader->tranId = pConn->inTranId;
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pHeader->uid = 0;
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
SRpcHeader *pHeader = (SRpcHeader *)data;
// set pConn parameters
pConn->inType = 0;
rpcFreeMsg(pConn->pRspMsg);
pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen;
assert(data);
assert(dataLen>0);
assert(pHeader->msgType > 0);
if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
dataLen = rpcAddAuthPart(pConn, data, dataLen);
pthread_mutex_unlock(&pRpc->mutex);
if ( rpcIsReq(pHeader->msgType)) {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr,
pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
} else {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pConn->peerPort,
(uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
}
rpcSendDataToPeer(pConn, msg, msgLen);
writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle);
return;
if (writtenLen != dataLen) {
tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn,
dataLen, writtenLen, strerror(errno));
}
tDump(data, dataLen);
}
static void rpcProcessConnError(void *param, void *id) {
......@@ -1050,11 +909,11 @@ static void rpcProcessConnError(void *param, void *id) {
pContext->ipSet.index++;
pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps;
SRpcConn *pConn = rpcGetConnToServer(pContext->pRpc, pContext->ipSet);
SRpcConn *pConn = rpcSetConnToServer(pContext->pRpc, pContext->ipSet);
pContext->code = terrno;
if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
taosSendReqToOneServer(pConn, pContext);
rpcSendReqToServer(pConn, pContext);
}
}
......@@ -1092,6 +951,103 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
if (reportDisc) rpcProcessConnError(pConn->pContext, NULL);
}
static void rpcProcessIdleTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param;
SRpcInfo *pRpc = pConn->pRpc;
assert(pRpc);
if (pConn->pIdleTimer != tmrId) {
tTrace("%s pConn:%p, idle timer:%p already processed", pRpc->label, pConn, tmrId);
return;
}
tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn);
rpcCloseConn(pConn);
}
static void rpcFreeMsg(void *msg) {
char *req = ((char *)msg) - sizeof(SRpcReqContext);
free(req);
}
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
SRpcHeader *pHeader = rpcHeaderFromCont(pCont);
int32_t overhead = sizeof(int32_t) * 2;
int32_t finalLen = 0;
if (!NEEDTO_COMPRESSS_MSG(contLen)) {
return contLen;
}
char *buf = malloc (contLen + overhead+8); // 16 extra bytes
if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno));
return contLen;
}
int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
/*
* only the compressed size is less than the value of contLen - overhead, the compression is applied
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/
if (compLen < contLen - overhead) {
//tDump(pCont, contLen);
int32_t *pLen = (int32_t *)pCont;
*pLen = 0; // first 4 bytes must be zero
pLen = (int32_t *)(pCont + sizeof(int32_t));
*pLen = htonl(contLen); // contLen is encoded in second 4 bytes
memcpy(pCont + overhead, buf, compLen);
pHeader->comp = 1;
tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen);
finalLen = compLen + overhead;
//tDump(pCont, contLen);
} else {
finalLen = contLen;
}
free(buf);
return finalLen;
}
static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) {
int overhead = sizeof(int32_t) * 2;
SRpcHeader *pNewHeader = NULL;
uint8_t *pCont = pHeader->content;
if (pHeader->comp) {
// decompress the content
assert(GET_INT32_VAL(pHeader->content) == 0);
// contLen is original message length before compression applied
int contLen = htonl(GET_INT32_VAL(pCont + sizeof(int32_t)));
// prepare the temporary buffer to decompress message
char *buf = rpcMallocCont(contLen);
if (buf) {
pNewHeader = rpcHeaderFromCont(buf);
int compLen = rpcContLenFromMsg(pHeader->msgLen) - overhead;
int32_t originalLen = LZ4_decompress_safe((const char*)(pCont + overhead), buf, compLen, contLen);
assert(originalLen == contLen);
memcpy(pNewHeader, pHeader, sizeof(SRpcHeader));
pNewHeader->msgLen = rpcMsgLenFromCont(originalLen);
free(pHeader); // free the compressed message buffer
pHeader = pNewHeader;
} else {
tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno));
}
}
return pHeader;
}
static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
MD5_CTX context;
int ret = -1;
......@@ -1120,3 +1076,60 @@ static int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t
return 0;
}
static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHeader *pHeader = (SRpcHeader *)msg;
if (pConn->spi) {
// add auth part
pHeader->spi = pConn->spi;
SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
pDigest->timeStamp = htonl(taosGetTimestampSec());
msgLen += sizeof(SRpcDigest);
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
rpcBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
} else {
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
return msgLen;
}
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHeader *pHeader = (SRpcHeader *)msg;
SRpcInfo *pRpc = pConn->pRpc;
int code = 0;
if (pConn->spi == 0 ) return 0;
if (pHeader->spi == pConn->spi) {
// authentication
SRpcDigest *pDigest = (SRpcDigest *)((char *)pHeader + msgLen - sizeof(SRpcDigest));
int32_t delta;
delta = (int32_t)htonl(pDigest->timeStamp);
delta -= (int32_t)taosGetTimestampSec();
if (abs(delta) > 900) {
tWarn("%s pConn:%p, time diff:%d is too big, msg discarded, timestamp:%d", pRpc->label, pConn,
delta, htonl(pDigest->timeStamp));
code = TSDB_CODE_INVALID_TIME_STAMP;
} else {
if (rpcAuthenticateMsg((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE;
} else {
pHeader->msgLen -= sizeof(SRpcDigest);
}
}
} else {
// if it is request or response with code 0, msg shall be discarded
if (rpcIsReq(pHeader->msgType) || (pHeader->content[0] == 0)) {
tTrace("%s pConn:%p, auth spi not matched, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE;
}
}
return code;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册