diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 3a0ea69246e5fa8ea30e5b4eda08b177dda375e9..f62d8056b7c0fb595328a92719b397a4342e8490 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -59,7 +59,8 @@ typedef struct { int16_t index; int16_t numOfIps; uint16_t port; - char ipStr[TSDB_MAX_MPEERS][40]; + uint32_t ip[TSDB_MAX_MPEERS]; + char ipStr[TSDB_MAX_MPEERS][TSDB_IPv4ADDR_LEN]; } SRpcIpSet; void *rpcOpen(SRpcInit *pRpc); diff --git a/src/rpc/inc/tcache.h b/src/rpc/inc/tconncache.h similarity index 82% rename from src/rpc/inc/tcache.h rename to src/rpc/inc/tconncache.h index 4c6acec096c01db64b09c4f0d18f404b8825f7b6..f297dab85062cfb5c3188650a49782ce177c3e3e 100644 --- a/src/rpc/inc/tcache.h +++ b/src/rpc/inc/tconncache.h @@ -13,23 +13,20 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_TSCCACHE_H -#define TDENGINE_TSCCACHE_H +#ifndef TDENGINE_CONN_CACHE_H +#define TDENGINE_CONN_CACHE_H #ifdef __cplusplus extern "C" { #endif void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer); - -void taosCloseConnCache(void *handle); - -void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user); - +void taosCloseConnCache(void *handle); +void taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user); void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user); #ifdef __cplusplus } #endif -#endif // TDENGINE_TSCACHE_H +#endif // TDENGINE_CONN_CACHE_H diff --git a/src/rpc/src/tcache.c b/src/rpc/src/tconncache.c similarity index 94% rename from src/rpc/src/tcache.c rename to src/rpc/src/tconncache.c index d8f9c6e5ecd3ca2a57154dc32fb507aef9ad44d5..2d1a61e4d3156b12aa70fbc7fdb321ce4ed5fc66 100644 --- a/src/rpc/src/tcache.c +++ b/src/rpc/src/tconncache.c @@ -21,7 +21,7 @@ #include "ttime.h" #include "ttimer.h" #include "tutil.h" -#include "tcache.h" +#include "tconncache.h" typedef struct _c_hash_t { uint32_t ip; @@ -64,7 +64,6 @@ int taosHashConn(void *handle, uint32_t ip, uint16_t port, char *user) { } void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64_t time) { - if (pNode == NULL) return; if (time < pObj->keepTimer + pNode->time) return; SConnHash *pPrev = pNode->prev, *pNext; @@ -86,7 +85,7 @@ void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64 pObj->connHashList[hash] = NULL; } -void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) { +void taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) { int hash; SConnHash * pNode; SConnCache *pObj; @@ -94,12 +93,8 @@ void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, uint64_t time = taosGetTimestampMs(); pObj = (SConnCache *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return NULL; - - if (data == NULL) { - tscTrace("data:%p ip:%p:%d not valid, not added in cache", data, ip, port); - return NULL; - } + assert(pObj); + assert(data); hash = taosHashConn(pObj, ip, port, user); pNode = (SConnHash *)taosMemPoolMalloc(pObj->connHashMemPool); @@ -123,7 +118,7 @@ void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, tscTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pObj->count[hash]); - return pObj; + return; } void taosCleanConnCache(void *handle, void *tmrId) { @@ -155,7 +150,7 @@ void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) void * pData = NULL; pObj = (SConnCache *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return NULL; + assert(pObj); uint64_t time = taosGetTimestampMs(); diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index 77438e7399cfb298517eec54269a275cfa8ac612..ffa308c9b16abbd56b44ad8575ab77f108144f3c 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -14,14 +14,12 @@ */ #include "os.h" -#include "tcache.h" #include "shash.h" #include "taosmsg.h" #include "tidpool.h" #include "tlog.h" #include "tmd5.h" #include "tmempool.h" -#include "trpc.h" #include "tsocket.h" #include "ttcpclient.h" #include "ttcpserver.h" @@ -30,6 +28,8 @@ #include "tudp.h" #include "tutil.h" #include "lz4.h" +#include "tconncache.h" +#include "trpc.h" #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest)) #define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader))) @@ -41,9 +41,8 @@ typedef struct { int sessions; int numOfThreads; - int type; int idleTime; // milliseconds; - uint32_t localIp; + char localIp[TSDB_IPv4ADDR_LEN]; uint16_t localPort; int connType; char label[12]; @@ -78,7 +77,6 @@ typedef struct { } SRpcReqContext; typedef struct _RpcConn { - void *signature; int sid; // session ID uint32_t ownId; // own link ID uint32_t peerId; // peer link ID @@ -177,163 +175,27 @@ void (*taosCloseConn[])(void *chandle) = { taosCloseTcpClientConnection }; -static void rpcProcessRetryTimer(void *, void *); -static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle); -static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen); -static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); -static int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); +static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort); static void rpcCloseConn(void *thandle); -static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr); -static void rpcProcessConnError(void *param, void *id); -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader); - -static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { - SRpcHeader *pHeader = rpcHeaderFromCont(pCont); - int32_t overhead = sizeof(int32_t) * 2; - int32_t finalLen = 0; - - if (!NEEDTO_COMPRESSS_MSG(contLen)) { - return contLen; - } - - char *buf = malloc (contLen + overhead+8); // 16 extra bytes - if (buf == NULL) { - tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno)); - return contLen; - } - - int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); - - /* - * only the compressed size is less than the value of contLen - overhead, the compression is applied - * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message - */ - if (compLen < contLen - overhead) { - //tDump(pCont, contLen); - int32_t *pLen = (int32_t *)pCont; - - *pLen = 0; // first 4 bytes must be zero - pLen = (int32_t *)(pCont + sizeof(int32_t)); - - *pLen = htonl(contLen); // contLen is encoded in second 4 bytes - memcpy(pCont + overhead, buf, compLen); - - pHeader->comp = 1; - tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); - - finalLen = compLen + overhead; - //tDump(pCont, contLen); - } else { - finalLen = contLen; - } - - free(buf); - return finalLen; -} - -static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) { - int overhead = sizeof(int32_t) * 2; - SRpcHeader *pNewHeader = NULL; - uint8_t *pCont = pHeader->content; - - if (pHeader->comp) { - // decompress the content - assert(GET_INT32_VAL(pHeader->content) == 0); - - // contLen is original message length before compression applied - int contLen = htonl(GET_INT32_VAL(pCont + sizeof(int32_t))); - - // prepare the temporary buffer to decompress message - char *buf = rpcMallocCont(contLen); - - if (buf) { - pNewHeader = rpcHeaderFromCont(buf); - int compLen = rpcContLenFromMsg(pHeader->msgLen) - overhead; - int32_t originalLen = LZ4_decompress_safe((const char*)(pCont + overhead), buf, compLen, contLen); - assert(originalLen == contLen); - - memcpy(pNewHeader, pHeader, sizeof(SRpcHeader)); - pNewHeader->msgLen = rpcMsgLenFromCont(originalLen); - free(pHeader); // free the compressed message buffer - pHeader = pNewHeader; - } else { - tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno)); - } - } - - return pHeader; -} - -void *rpcMallocCont(int size) { - char *pMsg = NULL; - - size += RPC_MSG_OVERHEAD; - pMsg = (char *)calloc(1, (size_t)size); - if (pMsg == NULL) { - tError("failed to malloc msg, size:%d", size); - return NULL; - } - - return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHeader); -} - -void rpcFreeCont(void *cont) { - char *msg = ((char *)cont) - sizeof(SRpcHeader); - free(msg); -} - -static void rpcFreeMsg(void *msg) { - char *req = ((char *)msg) - sizeof(SRpcReqContext); - free(req); -} - -void rpcSendSimpleRsp(void *thandle, int32_t code) { - char *pMsg; - STaosRsp *pRsp; - int msgLen = sizeof(STaosRsp); - - if (thandle == NULL) { - tError("connection is gone, response could not be sent"); - return; - } - - pMsg = rpcMallocCont(msgLen); - if (pMsg == NULL) return; - - pRsp = (STaosRsp *)pMsg; - pRsp->code = htonl(code); - - rpcSendResponse(thandle, pMsg, msgLen); - - return; -} +static SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet); +static int rpcGetConnObj(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr); -static void rpcSendQuickRsp(SRpcConn *pConn, char code) { - char msg[RPC_MSG_OVERHEAD + sizeof(STaosRsp)]; - SRpcHeader *pHeader; - int msgLen; - STaosRsp *pRsp; - - pRsp = (STaosRsp *)rpcContFromHeader(msg); - pRsp->code = htonl(code); - msgLen = sizeof(STaosRsp); +static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext); +static void rpcSendQuickRsp(SRpcConn *pConn, char code); +static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle); +static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen); - // set msg header - memset(msg, 0, sizeof(SRpcHeader)); - pHeader = (SRpcHeader *)msg; - pHeader->version = 1; - pHeader->msgType = pConn->inType+1; - pHeader->spi = 0; - pHeader->tcp = 0; - pHeader->encrypt = 0; - pHeader->tranId = pConn->inTranId; - pHeader->sourceId = pConn->ownId; - pHeader->destId = pConn->peerId; - pHeader->uid = 0; - memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); +static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle); +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader); +static void rpcProcessConnError(void *param, void *id); +static void rpcProcessRetryTimer(void *, void *); +static void rpcProcessIdleTimer(void *param, void *tmrId); - rpcSendDataToPeer(pConn, msg, msgLen); -} +static void rpcFreeMsg(void *msg); +static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); +static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader); +static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); +static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen); void *rpcOpen(SRpcInit *pInit) { SRpcInfo *pRpc; @@ -346,13 +208,14 @@ void *rpcOpen(SRpcInit *pInit) { strcpy(pRpc->label, pInit->label); pRpc->fp = pInit->fp; - pRpc->type = pInit->connType; + pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->numOfThreads = pInit->numOfThreads; if (pRpc->numOfThreads > TSDB_MAX_RPC_THREADS) { pRpc->numOfThreads = TSDB_MAX_RPC_THREADS; } + strcpy(pRpc->localIp, pInit->localIp); pRpc->localPort = pInit->localPort; pRpc->afp = pInit->afp; pRpc->sessions = pInit->sessions; @@ -416,7 +279,7 @@ void *rpcOpen(SRpcInit *pInit) { void rpcClose(void *param) { SRpcInfo *pRpc = (SRpcInfo *)param; - (*taosCleanUpConn[pRpc->type])(pRpc->shandle); + (*taosCleanUpConn[pRpc->connType])(pRpc->shandle); for (int i = 0; i < pRpc->sessions; ++i) { if (pRpc->connList[i].meterId[0]) { @@ -434,10 +297,111 @@ void rpcClose(void *param) { tfree(pRpc); } +void *rpcMallocCont(int size) { + char *pMsg = NULL; + + size += RPC_MSG_OVERHEAD; + pMsg = (char *)calloc(1, (size_t)size); + if (pMsg == NULL) { + tError("failed to malloc msg, size:%d", size); + return NULL; + } + + return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHeader); +} + +void rpcFreeCont(void *cont) { + char *msg = ((char *)cont) - sizeof(SRpcHeader); + free(msg); +} + +void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int contLen, void *ahandle) { + SRpcInfo *pRpc = (SRpcInfo *)shandle; + SRpcConn *pConn; + SRpcReqContext *pContext; + + contLen = rpcCompressRpcMsg(pCont, contLen); + pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHeader)-sizeof(SRpcReqContext)); + pContext->ahandle = ahandle; + pContext->pRpc = (SRpcInfo *)shandle; + pContext->ipSet = ipSet; + pContext->contLen = contLen; + pContext->pCont = pCont; + pContext->msgType = type; + + pConn = rpcSetConnToServer(shandle, ipSet); + pContext->code = terrno; + if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); + + rpcSendReqToServer(pConn, pContext); + + return; +} + +void rpcSendResponse(void *handle, void *pCont, int contLen) { + int msgLen = 0; + SRpcConn *pConn = (SRpcConn *)handle; + SRpcInfo *pRpc = pConn->pRpc; + SRpcHeader *pHeader = rpcHeaderFromCont(pCont); + char *msg = (char *)pHeader; + + contLen = rpcCompressRpcMsg(pCont, contLen); + msgLen = rpcMsgLenFromCont(contLen); + + pthread_mutex_lock(&pRpc->mutex); + + // set msg header + pHeader->version = 1; + pHeader->msgType = pConn->inType+1; + pHeader->spi = 0; + pHeader->tcp = 0; + pHeader->encrypt = 0; + pHeader->tranId = pConn->inTranId; + pHeader->sourceId = pConn->ownId; + pHeader->destId = pConn->peerId; + pHeader->uid = 0; + memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); + + // set pConn parameters + pConn->inType = 0; + rpcFreeMsg(pConn->pRspMsg); + pConn->pRspMsg = msg; + pConn->rspMsgLen = msgLen; + + if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; + + pthread_mutex_unlock(&pRpc->mutex); + + rpcSendDataToPeer(pConn, msg, msgLen); + + return; +} + +void rpcSendSimpleRsp(void *thandle, int32_t code) { + char *pMsg; + STaosRsp *pRsp; + int msgLen = sizeof(STaosRsp); + + if (thandle == NULL) { + tError("connection is gone, response could not be sent"); + return; + } + + pMsg = rpcMallocCont(msgLen); + if (pMsg == NULL) return; + + pRsp = (STaosRsp *)pMsg; + pRsp->code = htonl(code); + + rpcSendResponse(thandle, pMsg, msgLen); + + return; +} + static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) { SRpcConn *pConn; - if ( (uint8_t)(rpcGetConn(0, pRpc->meterId, pRpc, &pConn, 1, NULL)) != 0 ) + if ( (uint8_t)(rpcGetConnObj(0, pRpc->meterId, pRpc, &pConn, 1, NULL)) != 0 ) return NULL; strcpy(pConn->peerIpstr, peerIpStr); @@ -449,8 +413,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) strcpy(pConn->meterId, pRpc->meterId); // if it is client, it shall set up connection first - if (taosOpenConn[pRpc->type]) { - pConn->chandle = (*taosOpenConn[pRpc->type])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort); + if (taosOpenConn[pRpc->connType]) { + pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort); if (pConn->chandle) { tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label, pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort); @@ -475,7 +439,7 @@ static void rpcCloseConn(void *thandle) { pthread_mutex_lock(&pRpc->mutex); - if (taosCloseConn[pRpc->type]) (*taosCloseConn[pRpc->type])(pConn->chandle); + if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle); taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pIdleTimer); @@ -494,7 +458,7 @@ static void rpcCloseConn(void *thandle) { pthread_mutex_unlock(&pRpc->mutex); } -static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr) { +static int rpcGetConnObj(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr) { SRpcConn * pConn = NULL; if (sid == 0) { @@ -558,41 +522,16 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, return TSDB_CODE_SUCCESS; } -static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { - SRpcHeader *pHeader = (SRpcHeader *)msg; - SRpcInfo *pRpc = pConn->pRpc; - int code = 0; +SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet) { + SRpcInfo *pRpc = (SRpcInfo *)shandle; - if (pConn->spi == 0 ) return 0; + SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId); - if (pHeader->spi == pConn->spi) { - // authentication - SRpcDigest *pDigest = (SRpcDigest *)((char *)pHeader + msgLen - sizeof(SRpcDigest)); - - int32_t delta; - delta = (int32_t)htonl(pDigest->timeStamp); - delta -= (int32_t)taosGetTimestampSec(); - if (abs(delta) > 900) { - tWarn("%s pConn:%p, time diff:%d is too big, msg discarded, timestamp:%d", pRpc->label, pConn, - delta, htonl(pDigest->timeStamp)); - code = TSDB_CODE_INVALID_TIME_STAMP; - } else { - if (rpcAuthenticateMsg((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { - tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn); - code = TSDB_CODE_AUTH_FAILURE; - } else { - pHeader->msgLen -= sizeof(SRpcDigest); - } - } - } else { - // if it is request or response with code 0, msg shall be discarded - if (rpcIsReq(pHeader->msgType) || (pHeader->content[0] == 0)) { - tTrace("%s pConn:%p, auth spi not matched, msg discarded", pRpc->label, pConn); - code = TSDB_CODE_AUTH_FAILURE; - } - } + if ( pConn == NULL ) { + pConn = rpcOpenConn(pRpc, ipSet.ipStr[ipSet.index], ipSet.port); + } - return code; + return pConn; } static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { @@ -611,7 +550,7 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { if (pConn->inTranId == pHeader->tranId) { if (pConn->inType == pHeader->msgType) { tTrace("%s pConn:%p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHeader->msgType]); - taosSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); + rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); } else if (pConn->inType == 0) { tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->inTranId); @@ -675,6 +614,8 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) { pConn->outType = 0; pConn->pReqMsg = NULL; pConn->reqMsgLen = 0; + + return TSDB_CODE_SUCCESS; } static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) { @@ -707,7 +648,7 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHeader->uid, pHeader->sourceId); - code = rpcGetConn(sid, pHeader->meterId, pRpc, &pConn, rpcIsReq(pHeader->msgType), hashstr); + code = rpcGetConnObj(sid, pHeader->meterId, pRpc, &pConn, rpcIsReq(pHeader->msgType), hashstr); if (code != TSDB_CODE_SUCCESS) return code; *ppConn = pConn; @@ -737,62 +678,6 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d return code; } -void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) { - SRpcHeader *pRecvHeader, *pReplyHeader; - char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(STaosRsp)]; - STaosRsp *pRsp; - uint32_t timeStamp; - int msgLen; - - pRecvHeader = (SRpcHeader *)pMsg; - pReplyHeader = (SRpcHeader *)msg; - - memset(msg, 0, sizeof(SRpcHeader)); - pReplyHeader->version = pRecvHeader->version; - pReplyHeader->msgType = (char)(pRecvHeader->msgType + 1); - pReplyHeader->tcp = 0; - pReplyHeader->spi = 0; - pReplyHeader->encrypt = 0; - pReplyHeader->tranId = pRecvHeader->tranId; - pReplyHeader->sourceId = 0; - pReplyHeader->destId = pRecvHeader->sourceId; - memcpy(pReplyHeader->meterId, pRecvHeader->meterId, tListLen(pReplyHeader->meterId)); - - pRsp = (STaosRsp *)pReplyHeader->content; - pRsp->code = htonl(code); - msgLen = sizeof(STaosRsp); - char *pContent = pRsp->more; - - if (code == TSDB_CODE_INVALID_TIME_STAMP) { - // include a time stamp if client's time is not synchronized well - timeStamp = taosGetTimestampSec(); - memcpy(pContent, &timeStamp, sizeof(timeStamp)); - msgLen += sizeof(timeStamp); - } - - pReplyHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); - (*taosSendData[pRpc->type])(ip, port, msg, msgLen, chandle); - - return; -} - -void rpcProcessIdleTimer(void *param, void *tmrId) { - SRpcConn *pConn = (SRpcConn *)param; - if (pConn->signature != param) { - tError("idle timer pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param); - return; - } - - SRpcInfo * pRpc = pConn->pRpc; - if (pConn->pIdleTimer != tmrId) { - tTrace("%s pConn:%p, idle timer:%p already processed", pRpc->label, pConn, tmrId); - return; - } - - tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn); - rpcCloseConn(pConn); -} - static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) { SRpcHeader *pHeader = (SRpcHeader *)data; SRpcInfo *pRpc = (SRpcInfo *)shandle; @@ -863,7 +748,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { SRpcReqContext *pContext = pConn->pContext; pConn->pContext = NULL; - taosAddConnToIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId); + taosAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId); if (code == TSDB_CODE_NO_MASTER) { pContext->code = code; @@ -875,70 +760,73 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { } } -SRpcConn *rpcGetConnToServer(void *shandle, SRpcIpSet ipSet) { - SRpcInfo *pRpc = (SRpcInfo *)shandle; +static void rpcSendQuickRsp(SRpcConn *pConn, char code) { + char msg[RPC_MSG_OVERHEAD + sizeof(STaosRsp)]; + SRpcHeader *pHeader; + int msgLen; + STaosRsp *pRsp; - SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ipStr[ipSet.index], ipSet.port, pRpc->meterId); + pRsp = (STaosRsp *)rpcContFromHeader(msg); + pRsp->code = htonl(code); + msgLen = sizeof(STaosRsp); - if ( pConn == NULL ) { - pConn = rpcOpenConn(pRpc, ipSet.ipStr[ipSet.index], ipSet.port); - } + // set msg header + memset(msg, 0, sizeof(SRpcHeader)); + pHeader = (SRpcHeader *)msg; + pHeader->version = 1; + pHeader->msgType = pConn->inType+1; + pHeader->spi = 0; + pHeader->tcp = 0; + pHeader->encrypt = 0; + pHeader->tranId = pConn->inTranId; + pHeader->sourceId = pConn->ownId; + pHeader->destId = pConn->peerId; + pHeader->uid = 0; + memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); - return pConn; + rpcSendDataToPeer(pConn, msg, msgLen); } -int taosAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { - SRpcHeader *pHeader = (SRpcHeader *)msg; - - if (pConn->spi) { - // add auth part - pHeader->spi = pConn->spi; - SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen); - pDigest->timeStamp = htonl(taosGetTimestampSec()); - msgLen += sizeof(SRpcDigest); - pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); - rpcBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); - } else { - pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); - } - - return msgLen; -} +static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) { + SRpcHeader *pRecvHeader, *pReplyHeader; + char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(STaosRsp)]; + STaosRsp *pRsp; + uint32_t timeStamp; + int msgLen; -static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen) { - int writtenLen = 0; - SRpcInfo *pRpc = pConn->pRpc; - SRpcHeader *pHeader = (SRpcHeader *)data; + pRecvHeader = (SRpcHeader *)pMsg; + pReplyHeader = (SRpcHeader *)msg; - assert(data); - assert(dataLen>0); - assert(pHeader->msgType > 0); + memset(msg, 0, sizeof(SRpcHeader)); + pReplyHeader->version = pRecvHeader->version; + pReplyHeader->msgType = (char)(pRecvHeader->msgType + 1); + pReplyHeader->tcp = 0; + pReplyHeader->spi = 0; + pReplyHeader->encrypt = 0; + pReplyHeader->tranId = pRecvHeader->tranId; + pReplyHeader->sourceId = 0; + pReplyHeader->destId = pRecvHeader->sourceId; + memcpy(pReplyHeader->meterId, pRecvHeader->meterId, tListLen(pReplyHeader->meterId)); - dataLen = taosAddAuthPart(pConn, data, dataLen); + pRsp = (STaosRsp *)pReplyHeader->content; + pRsp->code = htonl(code); + msgLen = sizeof(STaosRsp); + char *pContent = pRsp->more; - if ( rpcIsReq(pHeader->msgType)) { - if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d", - pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, - pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); - } else { - if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", - pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pConn->peerPort, - (uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); + if (code == TSDB_CODE_INVALID_TIME_STAMP) { + // include a time stamp if client's time is not synchronized well + timeStamp = taosGetTimestampSec(); + memcpy(pContent, &timeStamp, sizeof(timeStamp)); + msgLen += sizeof(timeStamp); } - writtenLen = (*taosSendData[pRpc->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle); + pReplyHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); + (*taosSendData[pRpc->connType])(ip, port, msg, msgLen, chandle); - if (writtenLen != dataLen) { - tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, - dataLen, writtenLen, strerror(errno)); - } - - tDump(data, dataLen); + return; } -void rpcSendReqToOneServer(SRpcConn *pConn, SRpcReqContext *pContext) { +static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext) { SRpcHeader *pHeader = rpcHeaderFromCont(pContext->pCont); SRpcInfo *pRpc = pConn->pRpc; char *msg = (char *)pHeader; @@ -970,66 +858,37 @@ void rpcSendReqToOneServer(SRpcConn *pConn, SRpcReqContext *pContext) { taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); } -void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int contLen, void *ahandle) { - SRpcInfo *pRpc = (SRpcInfo *)shandle; - SRpcConn *pConn; - SRpcReqContext *pContext; - - contLen = rpcCompressRpcMsg(pCont, contLen); - pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHeader)-sizeof(SRpcReqContext)); - pContext->ahandle = ahandle; - pContext->pRpc = (SRpcInfo *)shandle; - pContext->ipSet = ipSet; - pContext->contLen = contLen; - pContext->pCont = pCont; - pContext->msgType = type; - - pConn = rpcGetConnToServer(shandle, ipSet); - pContext->code = terrno; - if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); - - rpcSendReqToOneServer(pConn, pContext); - - return; -} - -void rpcSendResponse(void *handle, void *pCont, int contLen) { - int msgLen = 0; - SRpcConn *pConn = (SRpcConn *)handle; +static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen) { + int writtenLen = 0; SRpcInfo *pRpc = pConn->pRpc; - SRpcHeader *pHeader = rpcHeaderFromCont(pCont); - char *msg = (char *)pHeader; - - contLen = rpcCompressRpcMsg(pCont, contLen); - msgLen = rpcMsgLenFromCont(contLen); - - pthread_mutex_lock(&pRpc->mutex); - - // set msg header - pHeader->version = 1; - pHeader->msgType = pConn->inType+1; - pHeader->spi = 0; - pHeader->tcp = 0; - pHeader->encrypt = 0; - pHeader->tranId = pConn->inTranId; - pHeader->sourceId = pConn->ownId; - pHeader->destId = pConn->peerId; - pHeader->uid = 0; - memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); + SRpcHeader *pHeader = (SRpcHeader *)data; - // set pConn parameters - pConn->inType = 0; - rpcFreeMsg(pConn->pRspMsg); - pConn->pRspMsg = msg; - pConn->rspMsgLen = msgLen; + assert(data); + assert(dataLen>0); + assert(pHeader->msgType > 0); - if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; + dataLen = rpcAddAuthPart(pConn, data, dataLen); - pthread_mutex_unlock(&pRpc->mutex); + if ( rpcIsReq(pHeader->msgType)) { + if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) + tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d", + pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, + pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); + } else { + if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) + tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", + pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pConn->peerPort, + (uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); + } - rpcSendDataToPeer(pConn, msg, msgLen); + writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle); - return; + if (writtenLen != dataLen) { + tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, + dataLen, writtenLen, strerror(errno)); + } + + tDump(data, dataLen); } static void rpcProcessConnError(void *param, void *id) { @@ -1050,11 +909,11 @@ static void rpcProcessConnError(void *param, void *id) { pContext->ipSet.index++; pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps; - SRpcConn *pConn = rpcGetConnToServer(pContext->pRpc, pContext->ipSet); + SRpcConn *pConn = rpcSetConnToServer(pContext->pRpc, pContext->ipSet); pContext->code = terrno; if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); - taosSendReqToOneServer(pConn, pContext); + rpcSendReqToServer(pConn, pContext); } } @@ -1092,6 +951,103 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { if (reportDisc) rpcProcessConnError(pConn->pContext, NULL); } +static void rpcProcessIdleTimer(void *param, void *tmrId) { + SRpcConn *pConn = (SRpcConn *)param; + SRpcInfo *pRpc = pConn->pRpc; + + assert(pRpc); + + if (pConn->pIdleTimer != tmrId) { + tTrace("%s pConn:%p, idle timer:%p already processed", pRpc->label, pConn, tmrId); + return; + } + + tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn); + rpcCloseConn(pConn); +} + +static void rpcFreeMsg(void *msg) { + char *req = ((char *)msg) - sizeof(SRpcReqContext); + free(req); +} + +static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { + SRpcHeader *pHeader = rpcHeaderFromCont(pCont); + int32_t overhead = sizeof(int32_t) * 2; + int32_t finalLen = 0; + + if (!NEEDTO_COMPRESSS_MSG(contLen)) { + return contLen; + } + + char *buf = malloc (contLen + overhead+8); // 16 extra bytes + if (buf == NULL) { + tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno)); + return contLen; + } + + int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); + + /* + * only the compressed size is less than the value of contLen - overhead, the compression is applied + * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message + */ + if (compLen < contLen - overhead) { + //tDump(pCont, contLen); + int32_t *pLen = (int32_t *)pCont; + + *pLen = 0; // first 4 bytes must be zero + pLen = (int32_t *)(pCont + sizeof(int32_t)); + + *pLen = htonl(contLen); // contLen is encoded in second 4 bytes + memcpy(pCont + overhead, buf, compLen); + + pHeader->comp = 1; + tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); + + finalLen = compLen + overhead; + //tDump(pCont, contLen); + } else { + finalLen = contLen; + } + + free(buf); + return finalLen; +} + +static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) { + int overhead = sizeof(int32_t) * 2; + SRpcHeader *pNewHeader = NULL; + uint8_t *pCont = pHeader->content; + + if (pHeader->comp) { + // decompress the content + assert(GET_INT32_VAL(pHeader->content) == 0); + + // contLen is original message length before compression applied + int contLen = htonl(GET_INT32_VAL(pCont + sizeof(int32_t))); + + // prepare the temporary buffer to decompress message + char *buf = rpcMallocCont(contLen); + + if (buf) { + pNewHeader = rpcHeaderFromCont(buf); + int compLen = rpcContLenFromMsg(pHeader->msgLen) - overhead; + int32_t originalLen = LZ4_decompress_safe((const char*)(pCont + overhead), buf, compLen, contLen); + assert(originalLen == contLen); + + memcpy(pNewHeader, pHeader, sizeof(SRpcHeader)); + pNewHeader->msgLen = rpcMsgLenFromCont(originalLen); + free(pHeader); // free the compressed message buffer + pHeader = pNewHeader; + } else { + tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno)); + } + } + + return pHeader; +} + static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) { MD5_CTX context; int ret = -1; @@ -1120,3 +1076,60 @@ static int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t return 0; } + +static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { + SRpcHeader *pHeader = (SRpcHeader *)msg; + + if (pConn->spi) { + // add auth part + pHeader->spi = pConn->spi; + SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen); + pDigest->timeStamp = htonl(taosGetTimestampSec()); + msgLen += sizeof(SRpcDigest); + pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); + rpcBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); + } else { + pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); + } + + return msgLen; +} + +static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { + SRpcHeader *pHeader = (SRpcHeader *)msg; + SRpcInfo *pRpc = pConn->pRpc; + int code = 0; + + if (pConn->spi == 0 ) return 0; + + if (pHeader->spi == pConn->spi) { + // authentication + SRpcDigest *pDigest = (SRpcDigest *)((char *)pHeader + msgLen - sizeof(SRpcDigest)); + + int32_t delta; + delta = (int32_t)htonl(pDigest->timeStamp); + delta -= (int32_t)taosGetTimestampSec(); + if (abs(delta) > 900) { + tWarn("%s pConn:%p, time diff:%d is too big, msg discarded, timestamp:%d", pRpc->label, pConn, + delta, htonl(pDigest->timeStamp)); + code = TSDB_CODE_INVALID_TIME_STAMP; + } else { + if (rpcAuthenticateMsg((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { + tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn); + code = TSDB_CODE_AUTH_FAILURE; + } else { + pHeader->msgLen -= sizeof(SRpcDigest); + } + } + } else { + // if it is request or response with code 0, msg shall be discarded + if (rpcIsReq(pHeader->msgType) || (pHeader->content[0] == 0)) { + tTrace("%s pConn:%p, auth spi not matched, msg discarded", pRpc->label, pConn); + code = TSDB_CODE_AUTH_FAILURE; + } + } + + return code; +} + +