diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 504e56519b23781bad74584aaa20c1834f09e0b2..8294bd779b47e3cce8c3194352bfe639ad9ef276 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -13,126 +13,126 @@ * along with this program. If not, see . */ +#include "hash.h" +#include "lz4.h" #include "os.h" +#include "rpcCache.h" +#include "rpcHead.h" +#include "rpcLog.h" +#include "rpcTcp.h" +#include "rpcUdp.h" +#include "taoserror.h" +#include "taosmsg.h" +#include "tglobal.h" #include "tidpool.h" #include "tmd5.h" #include "tmempool.h" -#include "ttimer.h" -#include "tutil.h" -#include "lz4.h" #include "tref.h" -#include "taoserror.h" -#include "tsocket.h" -#include "tglobal.h" -#include "taosmsg.h" #include "trpc.h" -#include "hash.h" -#include "rpcLog.h" -#include "rpcUdp.h" -#include "rpcCache.h" -#include "rpcTcp.h" -#include "rpcHead.h" +#include "tsocket.h" +#include "ttimer.h" +#include "tutil.h" -#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) -#define rpcHeadFromCont(cont) ((SRpcHead *) ((char*)cont - sizeof(SRpcHead))) +#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) +#define rpcHeadFromCont(cont) ((SRpcHead *)((char *)cont - sizeof(SRpcHead))) #define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) #define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) #define rpcIsReq(type) (type & 1U) typedef struct { - int sessions; // number of sessions allowed - int numOfThreads; // number of threads to process incoming messages - int idleTime; // milliseconds; + int sessions; // number of sessions allowed + int numOfThreads; // number of threads to process incoming messages + int idleTime; // milliseconds; uint16_t localPort; int8_t connType; - int index; // for UDP server only, round robin for multiple threads + int index; // for UDP server only, round robin for multiple threads char label[TSDB_LABEL_LEN]; - char user[TSDB_UNI_LEN]; // meter ID - char spi; // security parameter index - char encrypt; // encrypt algorithm - char secret[TSDB_KEY_LEN]; // secret for the link - char ckey[TSDB_KEY_LEN]; // ciphering key - - void (*cfp)(SRpcMsg *, SRpcEpSet *); - int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); - - int32_t refCount; - void *idPool; // handle to ID pool - void *tmrCtrl; // handle to timer - SHashObj *hash; // handle returned by hash utility - void *tcphandle;// returned handle from TCP initialization - void *udphandle;// returned handle from UDP initialization - void *pCache; // connection cache + char user[TSDB_UNI_LEN]; // meter ID + char spi; // security parameter index + char encrypt; // encrypt algorithm + char secret[TSDB_KEY_LEN]; // secret for the link + char ckey[TSDB_KEY_LEN]; // ciphering key + + void (*cfp)(SRpcMsg *, SRpcEpSet *); + int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); + + int32_t refCount; + void *idPool; // handle to ID pool + void *tmrCtrl; // handle to timer + SHashObj *hash; // handle returned by hash utility + void *tcphandle; // returned handle from TCP initialization + void *udphandle; // returned handle from UDP initialization + void *pCache; // connection cache pthread_mutex_t mutex; struct SRpcConn *connList; // connection list } SRpcInfo; typedef struct SSendInfo { - void *pContext; - void *pConn; - void *pFdObj; + void *pContext; + void *pConn; + void *pFdObj; SOCKET fd; } SSendInfo; typedef struct { - SRpcInfo *pRpc; // associated SRpcInfo - SRpcEpSet epSet; // ip list provided by app - void *ahandle; // handle provided by app - struct SRpcConn *pConn; // pConn allocated - char msgType; // message type - uint8_t *pCont; // content provided by app - int32_t contLen; // content length - int32_t code; // error code - int16_t numOfTry; // number of try for different servers - int8_t oldInUse; // server EP inUse passed by app - int8_t redirect; // flag to indicate redirect - int8_t connType; // connection type - int64_t rid; // refId returned by taosAddRef - SRpcMsg *pRsp; // for synchronous API - tsem_t *pSem; // for synchronous API - SRpcEpSet *pSet; // for synchronous API - SSendInfo sendInfo; // save last send information - char msg[0]; // RpcHead starts from here + SRpcInfo *pRpc; // associated SRpcInfo + SRpcEpSet epSet; // ip list provided by app + void *ahandle; // handle provided by app + struct SRpcConn *pConn; // pConn allocated + char msgType; // message type + uint8_t *pCont; // content provided by app + int32_t contLen; // content length + int32_t code; // error code + int16_t numOfTry; // number of try for different servers + int8_t oldInUse; // server EP inUse passed by app + int8_t redirect; // flag to indicate redirect + int8_t connType; // connection type + int64_t rid; // refId returned by taosAddRef + SRpcMsg *pRsp; // for synchronous API + tsem_t *pSem; // for synchronous API + SRpcEpSet *pSet; // for synchronous API + SSendInfo sendInfo; // save last send information + char msg[0]; // RpcHead starts from here } SRpcReqContext; typedef struct SRpcConn { - char info[48];// debug info: label + pConn + ahandle - int sid; // session ID - uint32_t ownId; // own link ID - uint32_t peerId; // peer link ID - char user[TSDB_UNI_LEN]; // user ID for the link - char spi; // security parameter index - char encrypt; // encryption, 0:1 - char secret[TSDB_KEY_LEN]; // secret for the link - char ckey[TSDB_KEY_LEN]; // ciphering key - char secured; // if set to 1, no authentication - uint16_t localPort; // for UDP only - uint32_t linkUid; // connection unique ID assigned by client - uint32_t peerIp; // peer IP - uint16_t peerPort; // peer port - char peerFqdn[TSDB_FQDN_LEN]; // peer FQDN or ip string - uint16_t tranId; // outgoing transcation ID, for build message - uint16_t outTranId; // outgoing transcation ID - uint16_t inTranId; // transcation ID for incoming msg - uint8_t outType; // message type for outgoing request - uint8_t inType; // message type for incoming request - void *chandle; // handle passed by TCP/UDP connection layer - void *ahandle; // handle provided by upper app layter - int retry; // number of retry for sending request - int tretry; // total retry - void *pTimer; // retry timer to monitor the response - void *pIdleTimer; // idle timer - char *pRspMsg; // response message including header - int rspMsgLen; // response messag length - char *pReqMsg; // request message including header - int reqMsgLen; // request message length - SRpcInfo *pRpc; // the associated SRpcInfo - int8_t connType; // connection type - int64_t lockedBy; // lock for connection - SRpcReqContext *pContext; // request context - int64_t rid; // probe msg use rid get pContext + char info[48]; // debug info: label + pConn + ahandle + int sid; // session ID + uint32_t ownId; // own link ID + uint32_t peerId; // peer link ID + char user[TSDB_UNI_LEN]; // user ID for the link + char spi; // security parameter index + char encrypt; // encryption, 0:1 + char secret[TSDB_KEY_LEN]; // secret for the link + char ckey[TSDB_KEY_LEN]; // ciphering key + char secured; // if set to 1, no authentication + uint16_t localPort; // for UDP only + uint32_t linkUid; // connection unique ID assigned by client + uint32_t peerIp; // peer IP + uint16_t peerPort; // peer port + char peerFqdn[TSDB_FQDN_LEN]; // peer FQDN or ip string + uint16_t tranId; // outgoing transcation ID, for build message + uint16_t outTranId; // outgoing transcation ID + uint16_t inTranId; // transcation ID for incoming msg + uint8_t outType; // message type for outgoing request + uint8_t inType; // message type for incoming request + void *chandle; // handle passed by TCP/UDP connection layer + void *ahandle; // handle provided by upper app layter + int retry; // number of retry for sending request + int tretry; // total retry + void *pTimer; // retry timer to monitor the response + void *pIdleTimer; // idle timer + char *pRspMsg; // response message including header + int rspMsgLen; // response messag length + char *pReqMsg; // request message including header + int reqMsgLen; // request message length + SRpcInfo *pRpc; // the associated SRpcInfo + int8_t connType; // connection type + int64_t lockedBy; // lock for connection + SRpcReqContext *pContext; // request context + int64_t rid; // probe msg use rid get pContext } SRpcConn; int tsRpcMaxUdpSize = 15000; // bytes @@ -144,41 +144,29 @@ int tsRpcOverhead; static int tsRpcRefId = -1; static int32_t tsRpcNum = 0; -//static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT; +// static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT; // server:0 client:1 tcp:2 udp:0 -#define RPC_CONN_UDPS 0 -#define RPC_CONN_UDPC 1 -#define RPC_CONN_TCPS 2 -#define RPC_CONN_TCPC 3 +#define RPC_CONN_UDPS 0 +#define RPC_CONN_UDPC 1 +#define RPC_CONN_TCPS 2 +#define RPC_CONN_TCPC 3 void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = { - taosInitUdpConnection, - taosInitUdpConnection, - taosInitTcpServer, - taosInitTcpClient -}; + taosInitUdpConnection, taosInitUdpConnection, taosInitTcpServer, taosInitTcpClient}; -void (*taosCleanUpConn[])(void *thandle) = { - taosCleanUpUdpConnection, - taosCleanUpUdpConnection, - taosCleanUpTcpServer, - taosCleanUpTcpClient -}; +void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer, + taosCleanUpTcpClient}; void (*taosStopConn[])(void *thandle) = { - taosStopUdpConnection, - taosStopUdpConnection, + taosStopUdpConnection, + taosStopUdpConnection, taosStopTcpServer, taosStopTcpClient, }; int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = { - taosSendUdpData, - taosSendUdpData, - taosSendTcpData, - taosSendTcpData -}; + taosSendUdpData, taosSendUdpData, taosSendTcpData, taosSendTcpData}; void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = { taosOpenUdpConnection, @@ -187,12 +175,7 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port taosOpenTcpClientConnection, }; -void (*taosCloseConn[])(void *chandle) = { - NULL, - NULL, - taosCloseTcpConnection, - taosCloseTcpConnection -}; +void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpConnection, taosCloseTcpConnection}; static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType); static void rpcCloseConn(void *thandle); @@ -214,15 +197,45 @@ static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); static void rpcProcessProgressTimer(void *param, void *tmrId); -static void rpcFreeMsg(void *msg); -static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); +static void rpcFreeMsg(void *msg); +static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen); static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead); -static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); -static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen); -static void rpcLockConn(SRpcConn *pConn); -static void rpcUnlockConn(SRpcConn *pConn); -static void rpcAddRef(SRpcInfo *pRpc); -static void rpcDecRef(SRpcInfo *pRpc); +static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); +static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen); +static void rpcLockConn(SRpcConn *pConn); +static void rpcUnlockConn(SRpcConn *pConn); +static void rpcAddRef(SRpcInfo *pRpc); +static void rpcDecRef(SRpcInfo *pRpc); + +static bool rpcGenUID(uint32_t *first, uint32_t *second) { + static uint64_t hashId = 0; + static uint32_t tranId = 0; + + if (hashId == 0) { + char uid[128] = {0}; + taosGetSystemUid(uid); + hashId = MurmurHash3_32(uid, strlen(uid)); + } + uint64_t id = 0; + while (true) { + int64_t ts = taosGetTimestampMs(); + uint64_t pid = taosGetPId(); + int32_t val = atomic_add_fetch_32(&tranId, 1); + if (val >= 0xFFFF) { + atomic_store_32(&tranId, 0); + } + + id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); + if (id) { + break; + } + } + + *first = (id >> 32) & 0xFFFFFFFF; + *second = id & 0xFFFFFFFF; + + return true; +} static void rpcFree(void *p) { tTrace("free mem: %p", p); @@ -230,40 +243,40 @@ static void rpcFree(void *p) { } int32_t rpcInit(void) { - tsProgressTimer = tsRpcTimer/2; - tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer; - tsRpcHeadSize = RPC_MSG_OVERHEAD; + tsProgressTimer = tsRpcTimer / 2; + tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsProgressTimer; + tsRpcHeadSize = RPC_MSG_OVERHEAD; tsRpcOverhead = sizeof(SRpcReqContext); tsRpcRefId = taosOpenRef(200, rpcFree); return 0; } - + void rpcCleanup(void) { taosCloseRef(tsRpcRefId); tsRpcRefId = -1; } - + void *rpcOpen(const SRpcInit *pInit) { SRpcInfo *pRpc; - //pthread_once(&tsRpcInit, rpcInit); + // pthread_once(&tsRpcInit, rpcInit); pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) return NULL; - if(pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); + if (pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); pRpc->connType = pInit->connType; if (pRpc->connType == TAOS_CONN_CLIENT) { pRpc->numOfThreads = pInit->numOfThreads; } else { - pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads; + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; } pRpc->idleTime = pInit->idleTime; pRpc->localPort = pInit->localPort; pRpc->afp = pInit->afp; - pRpc->sessions = pInit->sessions+1; + pRpc->sessions = pInit->sessions + 1; if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user)); if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret)); if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey)); @@ -282,14 +295,14 @@ void *rpcOpen(const SRpcInit *pInit) { return NULL; } - pRpc->idPool = taosInitIdPool(pRpc->sessions-1); + pRpc->idPool = taosInitIdPool(pRpc->sessions - 1); if (pRpc->idPool == NULL) { tError("%s failed to init ID pool", pRpc->label); rpcClose(pRpc); return NULL; } - pRpc->tmrCtrl = taosTmrInit(pRpc->sessions*2 + 1, 50, 10000, pRpc->label); + pRpc->tmrCtrl = taosTmrInit(pRpc->sessions * 2 + 1, 50, 10000, pRpc->label); if (pRpc->tmrCtrl == NULL) { tError("%s failed to init timers", pRpc->label); rpcClose(pRpc); @@ -304,8 +317,8 @@ void *rpcOpen(const SRpcInit *pInit) { return NULL; } } else { - pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20); - if ( pRpc->pCache == NULL ) { + pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20); + if (pRpc->pCache == NULL) { tError("%s failed to init connection cache", pRpc->label); rpcClose(pRpc); return NULL; @@ -314,10 +327,10 @@ void *rpcOpen(const SRpcInit *pInit) { pthread_mutex_init(&pRpc->mutex, NULL); - pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, - pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); - pRpc->udphandle = (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, - pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); + pRpc->tcphandle = (*taosInitConn[pRpc->connType | RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, + rpcProcessMsgFromPeer, pRpc); + pRpc->udphandle = + (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) { tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort); @@ -337,7 +350,7 @@ void rpcClose(void *param) { (*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); (*taosStopConn[pRpc->connType])(pRpc->udphandle); - // close all connections + // close all connections for (int i = 0; i < pRpc->sessions; ++i) { if (pRpc->connList && pRpc->connList[i].user[0]) { rpcCloseConn((void *)(pRpc->connList + i)); @@ -378,8 +391,8 @@ void *rpcReallocCont(void *ptr, int contLen) { if (ptr == NULL) return rpcMallocCont(contLen); char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead); - if (contLen == 0 ) { - free(start); + if (contLen == 0) { + free(start); return NULL; } @@ -388,7 +401,7 @@ void *rpcReallocCont(void *ptr, int contLen) { if (start == NULL) { tError("failed to realloc cont, size:%d", size); return NULL; - } + } return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } @@ -398,7 +411,7 @@ TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int6 SRpcReqContext *pContext; int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); - pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); + pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); pContext->ahandle = pMsg->ahandle; pContext->pRpc = (SRpcInfo *)shandle; pContext->epSet = *pEpSet; @@ -407,17 +420,17 @@ TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int6 pContext->msgType = pMsg->msgType; pContext->oldInUse = pEpSet->inUse; - pContext->connType = RPC_CONN_UDPC; + pContext->connType = RPC_CONN_UDPC; if (contLen > tsRpcMaxUdpSize || tsRpcForceTcp) pContext->connType = RPC_CONN_TCPC; - // connection type is application specific. + // connection type is application specific. // for TDengine, all the query, show commands shall have TCP connection char type = pMsg->msgType; - if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE - || type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP - || type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_TABLE_META - || type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_DM_STATUS || type == TSDB_MSG_TYPE_CM_ALTER_TABLE) + if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE || type == TSDB_MSG_TYPE_FETCH || + type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_CM_TABLES_META || + type == TSDB_MSG_TYPE_CM_TABLE_META || type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_DM_STATUS || + type == TSDB_MSG_TYPE_CM_ALTER_TABLE) pContext->connType = RPC_CONN_TCPC; pContext->rid = taosAddRef(tsRpcRefId, pContext); @@ -427,26 +440,26 @@ TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int6 } void rpcSendResponse(const SRpcMsg *pRsp) { - int msgLen = 0; - SRpcConn *pConn = (SRpcConn *)pRsp->handle; - SRpcMsg rpcMsg = *pRsp; - SRpcMsg *pMsg = &rpcMsg; - SRpcInfo *pRpc = pConn->pRpc; + int msgLen = 0; + SRpcConn *pConn = (SRpcConn *)pRsp->handle; + SRpcMsg rpcMsg = *pRsp; + SRpcMsg *pMsg = &rpcMsg; + SRpcInfo *pRpc = pConn->pRpc; - if ( pMsg->pCont == NULL ) { + if (pMsg->pCont == NULL) { pMsg->pCont = rpcMallocCont(0); pMsg->contLen = 0; } - SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont); - char *msg = (char *)pHead; + SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont); + char *msg = (char *)pHead; pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); msgLen = rpcMsgLenFromCont(pMsg->contLen); rpcLockConn(pConn); - if ( pConn->inType == 0 || pConn->user[0] == 0 ) { + if (pConn->inType == 0 || pConn->user[0] == 0) { tError("%s, connection is already released, rsp wont be sent", pConn->info); rpcUnlockConn(pConn); rpcFreeCont(pMsg->pCont); @@ -456,7 +469,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) { // set msg header pHead->version = 1; - pHead->msgType = pConn->inType+1; + pHead->msgType = pConn->inType + 1; pHead->spi = pConn->spi; pHead->encrypt = pConn->encrypt; pHead->tranId = pConn->inTranId; @@ -465,13 +478,13 @@ void rpcSendResponse(const SRpcMsg *pRsp) { pHead->linkUid = pConn->linkUid; pHead->port = htons(pConn->localPort); pHead->code = htonl(pMsg->code); - pHead->ahandle = (uint64_t) pConn->ahandle; - + pHead->ahandle = (uint64_t)pConn->ahandle; + // set pConn parameters pConn->inType = 0; // response message is released until new response is sent - rpcFreeMsg(pConn->pRspMsg); + rpcFreeMsg(pConn->pRspMsg); pConn->pRspMsg = msg; pConn->rspMsgLen = msgLen; if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--; @@ -484,23 +497,22 @@ void rpcSendResponse(const SRpcMsg *pRsp) { rpcSendMsgToPeer(pConn, msg, msgLen); // if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured - if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY) - pConn->secured = 1; // connection shall be secured + if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY) pConn->secured = 1; // connection shall be secured if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); pConn->pReqMsg = NULL; pConn->reqMsgLen = 0; rpcUnlockConn(pConn); - rpcDecRef(pRpc); // decrease the referene count + rpcDecRef(pRpc); // decrease the referene count return; } void rpcSendRedirectRsp(void *thandle, const SRpcEpSet *pEpSet) { - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; memset(&rpcMsg, 0, sizeof(rpcMsg)); - + rpcMsg.contLen = sizeof(SRpcEpSet); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); if (rpcMsg.pCont == NULL) return; @@ -516,7 +528,7 @@ void rpcSendRedirectRsp(void *thandle, const SRpcEpSet *pEpSet) { } int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { - SRpcConn *pConn = (SRpcConn *)thandle; + SRpcConn *pConn = (SRpcConn *)thandle; if (pConn->user[0] == 0) return -1; pInfo->clientIp = pConn->peerIp; @@ -529,11 +541,11 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { SRpcReqContext *pContext; - pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); + pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); memset(pRsp, 0, sizeof(SRpcMsg)); - - tsem_t sem; + + tsem_t sem; tsem_init(&sem, 0, 0); pContext->pSem = &sem; pContext->pRsp = pRsp; @@ -550,13 +562,13 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) // this API is used by server app to keep an APP context in case connection is broken int rpcReportProgress(void *handle, char *pCont, int contLen) { SRpcConn *pConn = (SRpcConn *)handle; - int code = 0; + int code = 0; rpcLockConn(pConn); if (pConn->user[0]) { // pReqMsg and reqMsgLen is re-used to store the context from app server - pConn->pReqMsg = pCont; + pConn->pReqMsg = pCont; pConn->reqMsgLen = contLen; } else { tDebug("%s, rpc connection is already released", pConn->info); @@ -569,7 +581,6 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) { } void rpcCancelRequest(int64_t rid) { - SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid); if (pContext == NULL) return; @@ -579,7 +590,7 @@ void rpcCancelRequest(int64_t rid) { } static void rpcFreeMsg(void *msg) { - if ( msg ) { + if (msg) { char *temp = (char *)msg - sizeof(SRpcReqContext); free(temp); tTrace("free mem: %p", temp); @@ -591,14 +602,14 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, uint32_t peerIp = taosGetIpv4FromFqdn(peerFqdn); if (peerIp == 0xFFFFFFFF) { - tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); - terrno = TSDB_CODE_RPC_FQDN_ERROR; + tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); + terrno = TSDB_CODE_RPC_FQDN_ERROR; return NULL; } - pConn = rpcAllocateClientConn(pRpc); + pConn = rpcAllocateClientConn(pRpc); - if (pConn) { + if (pConn) { tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn)); pConn->peerIp = peerIp; pConn->peerPort = peerPort; @@ -606,7 +617,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, pConn->connType = connType; if (taosOpenConn[connType]) { - void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle; + void *shandle = (connType & RPC_CONN_TCP) ? pRpc->tcphandle : pRpc->udphandle; pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort); if (pConn->chandle == NULL) { tError("failed to connect to:0x%x:%d", pConn->peerIp, pConn->peerPort); @@ -631,13 +642,14 @@ static void rpcReleaseConn(SRpcConn *pConn) { taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pIdleTimer); - if ( pRpc->connType == TAOS_CONN_SERVER) { - char hashstr[40] = {0}; - size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); + if (pRpc->connType == TAOS_CONN_SERVER) { + char hashstr[40] = {0}; + size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, + pConn->connType); taosHashRemove(pRpc->hash, hashstr, size); - rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg + rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg pConn->pRspMsg = NULL; - + // if server has ever reported progress, free content if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); // do not use rpcFreeMsg } else { @@ -645,17 +657,17 @@ static void rpcReleaseConn(SRpcConn *pConn) { if (pConn->outType && pConn->pReqMsg) { SRpcReqContext *pContext = pConn->pContext; if (pContext) { - if (pContext->pRsp) { - // for synchronous API, post semaphore to unblock app + if (pContext->pRsp) { + // for synchronous API, post semaphore to unblock app pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR; pContext->pRsp->pCont = NULL; pContext->pRsp->contLen = 0; tsem_post(pContext->pSem); } - pContext->pConn = NULL; + pContext->pConn = NULL; taosRemoveRef(tsRpcRefId, pContext->rid); } else { - assert(0); + assert(0); } } } @@ -684,8 +696,7 @@ static void rpcCloseConn(void *thandle) { rpcLockConn(pConn); - if (pConn->user[0]) - rpcReleaseConn(pConn); + if (pConn->user[0]) rpcReleaseConn(pConn); rpcUnlockConn(pConn); } @@ -693,6 +704,9 @@ static void rpcCloseConn(void *thandle) { static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { SRpcConn *pConn = NULL; + uint32_t transId, linkUid; + rpcGenUID(&transId, &linkUid); + int sid = taosAllocateId(pRpc->idPool); if (sid <= 0) { tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions); @@ -702,9 +716,9 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { pConn->pRpc = pRpc; pConn->sid = sid; - pConn->tranId = (uint16_t)(taosRand() & 0xFFFF); + pConn->tranId = transId; pConn->ownId = htonl(pConn->sid); - pConn->linkUid = (uint32_t)((int64_t)pConn + (int64_t)getpid() + (int64_t)pConn->tranId); + pConn->linkUid = linkUid; pConn->spi = pRpc->spi; pConn->encrypt = pRpc->encrypt; if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); @@ -719,8 +733,9 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { char hashstr[40] = {0}; SRpcHead *pHead = (SRpcHead *)pRecv->msg; - size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); - + size_t size = + snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); + // check if it is already allocated SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size)); if (ppConn) pConn = *ppConn; @@ -769,22 +784,23 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); - tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s", pRpc->label, pConn, pConn->linkUid, sid, hashstr); + tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s", pRpc->label, pConn, pConn->linkUid, sid, + hashstr); } return pConn; } static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { - SRpcConn *pConn = NULL; + SRpcConn *pConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg; if (sid) { pConn = pRpc->connList + sid; if (pConn->user[0] == 0) pConn = NULL; - } + } - if (pConn == NULL) { + if (pConn == NULL) { if (pRpc->connType == TAOS_CONN_SERVER) { pConn = rpcAllocateServerConn(pRpc, pRecv); } else { @@ -805,14 +821,15 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { } static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { - SRpcConn *pConn; - SRpcInfo *pRpc = pContext->pRpc; - SRpcEpSet *pEpSet = &pContext->epSet; + SRpcConn *pConn; + SRpcInfo *pRpc = pContext->pRpc; + SRpcEpSet *pEpSet = &pContext->epSet; - pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); - if ( pConn == NULL || pConn->user[0] == 0) { + pConn = + rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); + if (pConn == NULL || pConn->user[0] == 0) { pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); - } + } if (pConn) { pConn->tretry = 0; @@ -827,55 +844,52 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { } static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { - - if (pConn->peerId == 0) { - pConn->peerId = pHead->sourceId; - } else { - if (pConn->peerId != pHead->sourceId) { - tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, - pConn->peerId, pHead->sourceId); - return TSDB_CODE_RPC_INVALID_VALUE; - } + if (pConn->peerId == 0) { + pConn->peerId = pHead->sourceId; + } else { + if (pConn->peerId != pHead->sourceId) { + tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, pConn->peerId, pHead->sourceId); + return TSDB_CODE_RPC_INVALID_VALUE; } + } - if (pConn->inTranId == pHead->tranId) { - if (pConn->inType == pHead->msgType) { - if (pHead->code == 0) { - tDebug("%s, %s is retransmitted", pConn->info, taosMsg[pHead->msgType]); - rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS); - } else { - // do nothing, it is heart beat from client - } - } else if (pConn->inType == 0) { - tDebug("%s, %s is already processed, tranId:%d", pConn->info, taosMsg[pHead->msgType], pConn->inTranId); - rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response + if (pConn->inTranId == pHead->tranId) { + if (pConn->inType == pHead->msgType) { + if (pHead->code == 0) { + tDebug("%s, %s is retransmitted", pConn->info, taosMsg[pHead->msgType]); + rpcSendQuickRsp(pConn, TSDB_CODE_RPC_ACTION_IN_PROGRESS); } else { - tDebug("%s, mismatched message %s and tranId", pConn->info, taosMsg[pHead->msgType]); + // do nothing, it is heart beat from client } - - // do not reply any message - return TSDB_CODE_RPC_ALREADY_PROCESSED; + } else if (pConn->inType == 0) { + tDebug("%s, %s is already processed, tranId:%d", pConn->info, taosMsg[pHead->msgType], pConn->inTranId); + rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response + } else { + tDebug("%s, mismatched message %s and tranId", pConn->info, taosMsg[pHead->msgType]); } - if (pConn->inType != 0) { - tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, - pConn->inTranId, pHead->tranId); - return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED; - } + // do not reply any message + return TSDB_CODE_RPC_ALREADY_PROCESSED; + } - if (rpcContLenFromMsg(pHead->msgLen) <= 0) { - tDebug("%s, message body is empty, ignore", pConn->info); - return TSDB_CODE_RPC_APP_ERROR; - } + if (pConn->inType != 0) { + tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, pConn->inTranId, pHead->tranId); + return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED; + } + + if (rpcContLenFromMsg(pHead->msgLen) <= 0) { + tDebug("%s, message body is empty, ignore", pConn->info); + return TSDB_CODE_RPC_APP_ERROR; + } - pConn->inTranId = pHead->tranId; - pConn->inType = pHead->msgType; + pConn->inTranId = pHead->tranId; + pConn->inType = pHead->msgType; - // start the progress timer to monitor the response from server app - if (pConn->connType != RPC_CONN_TCPS) - pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl); - - return 0; + // start the progress timer to monitor the response from server app + if (pConn->connType != RPC_CONN_TCPS) + pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl); + + return 0; } static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { @@ -900,7 +914,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { if (pHead->code == TSDB_CODE_RPC_AUTH_REQUIRED && pRpc->spi) { tDebug("%s, authentication shall be restarted", pConn->info); pConn->secured = 0; - rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); + rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); if (pConn->connType != RPC_CONN_TCPC) pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); return TSDB_CODE_RPC_ALREADY_PROCESSED; @@ -910,7 +924,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { tDebug("%s, mismatched linkUid, link shall be restarted", pConn->info); pConn->secured = 0; ((SRpcHead *)pConn->pReqMsg)->destId = 0; - rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); + rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); if (pConn->connType != RPC_CONN_TCPC) pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); return TSDB_CODE_RPC_ALREADY_PROCESSED; @@ -936,25 +950,25 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { pConn->reqMsgLen = 0; SRpcReqContext *pContext = pConn->pContext; - if (pHead->code == TSDB_CODE_RPC_REDIRECT) { + if (pHead->code == TSDB_CODE_RPC_REDIRECT) { if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) { // if EpSet is not included in the msg, treat it as NOT_READY - pHead->code = TSDB_CODE_RPC_NOT_READY; + pHead->code = TSDB_CODE_RPC_NOT_READY; } else { pContext->redirect++; if (pContext->redirect > TSDB_MAX_REPLICA) { - pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; tWarn("%s, too many redirects, quit", pConn->info); } } - } + } return TSDB_CODE_SUCCESS; } static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) { - int32_t sid; - SRpcConn *pConn = NULL; + int32_t sid; + SRpcConn *pConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg; @@ -963,20 +977,23 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); - terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE; return NULL; + terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE; + return NULL; } if (sid < 0 || sid >= pRpc->sessions) { - tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, - pRpc->sessions, taosMsg[pHead->msgType]); - terrno = TSDB_CODE_RPC_INVALID_SESSION_ID; return NULL; + tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, pRpc->sessions, + taosMsg[pHead->msgType]); + terrno = TSDB_CODE_RPC_INVALID_SESSION_ID; + return NULL; } // compatibility between old version client and new version server, since 2.4.0.0 - if (rpcIsReq(pHead->msgType)){ - if((htonl(pHead->msgVer) >> 16 != tsVersion >> 24) || - ((htonl(pHead->msgVer) >> 16 == tsVersion >> 24) && htonl(pHead->msgVer) < ((2 << 16) | (4 << 8)))){ - tError("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion, taosMsg[pHead->msgType]); + if (rpcIsReq(pHead->msgType)) { + if ((htonl(pHead->msgVer) >> 16 != tsVersion >> 24) || + ((htonl(pHead->msgVer) >> 16 == tsVersion >> 24) && htonl(pHead->msgVer) < ((2 << 16) | (4 << 8)))) { + tError("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion, + taosMsg[pHead->msgType]); terrno = TSDB_CODE_RPC_INVALID_VERSION; return NULL; } @@ -984,11 +1001,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont pConn = rpcGetConnObj(pRpc, sid, pRecv); if (pConn == NULL) { - tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno)); + tDebug("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno)); return NULL; - } + } - if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN || pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { + if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN || pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { return pConn; } @@ -1004,7 +1021,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont pConn->peerIp = pRecv->ip; pConn->peerPort = pRecv->port; - if (pHead->port) pConn->peerPort = htons(pHead->port); + if (pHead->port) pConn->peerPort = htons(pHead->port); terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen); @@ -1016,16 +1033,16 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont // decrypt here } - if ( rpcIsReq(pHead->msgType) ) { + if (rpcIsReq(pHead->msgType)) { pConn->connType = pRecv->connType; terrno = rpcProcessReqHead(pConn, pHead); // stop idle timer - taosTmrStopA(&pConn->pIdleTimer); + taosTmrStopA(&pConn->pIdleTimer); - // client shall send the request within tsRpcTime again for UDP, double it + // client shall send the request within tsRpcTime again for UDP, double it if (pConn->connType != RPC_CONN_TCPS) - pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*20, pConn, pRpc->tmrCtrl); + pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer * 20, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); *ppContext = pConn->pContext; @@ -1038,11 +1055,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont } static void doRpcReportBrokenLinkToServer(void *param, void *id) { - SRpcMsg *pRpcMsg = (SRpcMsg *)(param); - SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle); - SRpcInfo *pRpc = pConn->pRpc; - (*(pRpc->cfp))(pRpcMsg, NULL); - free(pRpcMsg); + SRpcMsg *pRpcMsg = (SRpcMsg *)(param); + SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle); + SRpcInfo *pRpc = pConn->pRpc; + (*(pRpc->cfp))(pRpcMsg, NULL); + free(pRpcMsg); } static void rpcReportBrokenLinkToServer(SRpcConn *pConn) { SRpcInfo *pRpc = pConn->pRpc; @@ -1053,12 +1070,12 @@ static void rpcReportBrokenLinkToServer(SRpcConn *pConn) { tDebug("%s, notify the server app, connection is gone", pConn->info); SRpcMsg *rpcMsg = malloc(sizeof(SRpcMsg)); - rpcMsg->pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server - rpcMsg->contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length + rpcMsg->pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server + rpcMsg->contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length rpcMsg->ahandle = pConn->ahandle; rpcMsg->handle = pConn; rpcMsg->msgType = pConn->inType; - rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pConn->pReqMsg = NULL; pConn->reqMsgLen = 0; if (pRpc->cfp) { @@ -1082,8 +1099,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { pConn->pReqMsg = NULL; taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); } - - if (pConn->inType) rpcReportBrokenLinkToServer(pConn); + + if (pConn->inType) rpcReportBrokenLinkToServer(pConn); rpcReleaseConn(pConn); rpcUnlockConn(pConn); @@ -1091,12 +1108,12 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { // process probe msg , return true is probe msg, false is not probe msg static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { - SRpcHead *pHead = (SRpcHead *)pRecv->msg; + SRpcHead *pHead = (SRpcHead *)pRecv->msg; uint64_t ahandle = pHead->ahandle; if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) { // response to - char msg[RPC_MSG_OVERHEAD]; - SRpcHead *pRspHead; + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pRspHead; // set msg header memset(msg, 0, sizeof(SRpcHead)); @@ -1105,13 +1122,13 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { pRspHead->msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP; pRspHead->version = 1; pRspHead->ahandle = pHead->ahandle; - pRspHead->tranId = pHead->tranId; - pRspHead->code = 0; + pRspHead->tranId = pHead->tranId; + pRspHead->code = 0; pRspHead->linkUid = pHead->linkUid; rpcLockConn(pConn); pRspHead->sourceId = pConn->ownId; - pRspHead->destId = pConn->peerId; + pRspHead->destId = pConn->peerId; memcpy(pRspHead->user, pHead->user, tListLen(pHead->user)); bool ret = rpcSendMsgToPeer(pConn, pRspHead, sizeof(SRpcHead)); @@ -1120,19 +1137,20 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { rpcUnlockConn(pConn); rpcFreeMsg(pRecv->msg); } else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { - if(pConn) { + if (pConn) { rpcLockConn(pConn); // get req content SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, pConn->rid); - + if (pContext) { rpcProcessIncomingMsg(pConn, pHead, pContext); taosReleaseRef(tsRpcRefId, pConn->rid); } else { - tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pContext is NULL. pConn->rid=0x%" PRIX64, ahandle, pConn->rid); + tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pContext is NULL. pConn->rid=0x%" PRIX64, ahandle, + pConn->rid); rpcFreeMsg(pRecv->msg); } - + rpcUnlockConn(pConn); } else { tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pConn is NULL.", ahandle); @@ -1142,14 +1160,14 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { } static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { - SRpcHead *pHead = (SRpcHead *)pRecv->msg; - SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; - SRpcConn *pConn = (SRpcConn *)pRecv->thandle; + SRpcHead *pHead = (SRpcHead *)pRecv->msg; + SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; + SRpcConn *pConn = (SRpcConn *)pRecv->thandle; tDump(pRecv->msg, pRecv->msgLen); // underlying UDP layer does not know it is server or client - pRecv->connType = pRecv->connType | pRpc->connType; + pRecv->connType = pRecv->connType | pRpc->connType; if (pRecv->msg == NULL) { rpcProcessBrokenLink(pConn); @@ -1159,7 +1177,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { terrno = 0; SRpcReqContext *pContext; pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); - + // deal probe msg if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN || pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { rpcProcessProbeMsg(pRecv, pConn); @@ -1178,54 +1196,53 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { int32_t code = terrno; if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) { - if (code != 0) { // parsing error + if (code != 0) { // parsing error if (rpcIsReq(pHead->msgType)) { rpcSendErrorMsgToPeer(pRecv, code); if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE) { rpcCloseConn(pConn); } - if (pHead->msgType + 1 > 1 && pHead->msgType+1 < TSDB_MSG_TYPE_MAX) { - tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); + if (pHead->msgType + 1 > 1 && pHead->msgType + 1 < TSDB_MSG_TYPE_MAX) { + tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, + taosMsg[pHead->msgType + 1], code); } else { - tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], code); - } - } - } else { // msg is passed to app only parsing is ok + tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, + taosMsg[pHead->msgType], code); + } + } + } else { // msg is passed to app only parsing is ok rpcProcessIncomingMsg(pConn, pHead, pContext); } } - if (code) rpcFreeMsg(pRecv->msg); // parsing failed, msg shall be freed + if (code) rpcFreeMsg(pRecv->msg); // parsing failed, msg shall be freed return pConn; } static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { - SRpcInfo *pRpc = pContext->pRpc; + SRpcInfo *pRpc = pContext->pRpc; pContext->pConn = NULL; - if (pContext->pRsp) { + if (pContext->pRsp) { // for synchronous API memcpy(pContext->pSet, &pContext->epSet, sizeof(SRpcEpSet)); memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); tsem_post(pContext->pSem); } else { - // for asynchronous API + // for asynchronous API SRpcEpSet *pEpSet = NULL; - if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) - pEpSet = &pContext->epSet; + if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) pEpSet = &pContext->epSet; - (*pRpc->cfp)(pMsg, pEpSet); + (*pRpc->cfp)(pMsg, pEpSet); } // free the request message - if(pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN && pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN_RSP) { - taosRemoveRef(tsRpcRefId, pContext->rid); + if (pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN && pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN_RSP) { + taosRemoveRef(tsRpcRefId, pContext->rid); } - } static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { - SRpcInfo *pRpc = pConn->pRpc; SRpcMsg rpcMsg; @@ -1233,9 +1250,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen); rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; - rpcMsg.code = pHead->code; - - if ( rpcIsReq(pHead->msgType) ) { + rpcMsg.code = pHead->code; + + if (rpcIsReq(pHead->msgType)) { rpcMsg.ahandle = pConn->ahandle; rpcMsg.handle = pConn; rpcAddRef(pRpc); // add the refCount for requests @@ -1249,22 +1266,23 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { // probe msg rpcNotifyClient(pContext, &rpcMsg); - return ; + return; } - // reset pConn NULL + // reset pConn NULL pContext->pConn = NULL; // for UDP, port may be changed by server, the port in epSet shall be used for cache if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], + pConn->connType); } else { rpcCloseConn(pConn); } if (pHead->code == TSDB_CODE_RPC_REDIRECT) { pContext->numOfTry = 0; - SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content; + SRpcEpSet *pEpSet = (SRpcEpSet *)pHead->content; if (pEpSet->numOfEps > 0) { memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet)); tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps, @@ -1277,7 +1295,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte } rpcSendReqToServer(pRpc, pContext); rpcFreeCont(rpcMsg.pCont); - } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || pHead->code == TSDB_CODE_DND_EXITING) { + } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || + pHead->code == TSDB_CODE_DND_EXITING) { pContext->code = pHead->code; rpcProcessConnError(pContext, NULL); rpcFreeCont(rpcMsg.pCont); @@ -1288,14 +1307,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte } static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { - char msg[RPC_MSG_OVERHEAD]; - SRpcHead *pHead; + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pHead; // set msg header memset(msg, 0, sizeof(SRpcHead)); pHead = (SRpcHead *)msg; pHead->version = 1; - pHead->msgType = pConn->inType+1; + pHead->msgType = pConn->inType + 1; pHead->spi = pConn->spi; pHead->encrypt = 0; pHead->tranId = pConn->inTranId; @@ -1307,12 +1326,12 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { pHead->code = htonl(code); rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead)); - pConn->secured = 1; // connection shall be secured + pConn->secured = 1; // connection shall be secured } static void rpcSendReqHead(SRpcConn *pConn) { - char msg[RPC_MSG_OVERHEAD]; - SRpcHead *pHead; + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pHead; // set msg header memset(msg, 0, sizeof(SRpcHead)); @@ -1334,10 +1353,10 @@ static void rpcSendReqHead(SRpcConn *pConn) { } static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { - SRpcHead *pRecvHead, *pReplyHead; - char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) ]; - uint32_t timeStamp; - int msgLen; + SRpcHead *pRecvHead, *pReplyHead; + char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t)]; + uint32_t timeStamp; + int msgLen; pRecvHead = (SRpcHead *)pRecv->msg; pReplyHead = (SRpcHead *)msg; @@ -1367,14 +1386,14 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen); (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle); - return; + return; } static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { - SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); - char *msg = (char *)pHead; - int msgLen = rpcMsgLenFromCont(pContext->contLen); - char msgType = pContext->msgType; + SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); + char *msg = (char *)pHead; + int msgLen = rpcMsgLenFromCont(pContext->contLen); + char msgType = pContext->msgType; pContext->numOfTry++; SRpcConn *pConn = rpcSetupConnToServer(pContext); @@ -1389,13 +1408,13 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pConn->ahandle = pContext->ahandle; rpcLockConn(pConn); - // set the message header + // set the message header pHead->version = 1; pHead->msgVer = htonl(tsVersion >> 8); pHead->msgType = msgType; pHead->encrypt = 0; pConn->tranId++; - if ( pConn->tranId == 0 ) pConn->tranId++; + if (pConn->tranId == 0) pConn->tranId++; pHead->tranId = pConn->tranId; pHead->sourceId = pConn->ownId; pHead->destId = pConn->peerId; @@ -1410,13 +1429,12 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pConn->pReqMsg = msg; pConn->reqMsgLen = msgLen; pConn->pContext = pContext; - if(pContext) - pConn->rid = pContext->rid; + if (pContext) pConn->rid = pContext->rid; - // save - pContext->sendInfo.pConn = pConn; + // save + pContext->sendInfo.pConn = pConn; pContext->sendInfo.pFdObj = pConn->chandle; - pContext->sendInfo.fd = taosGetFdID(pConn->chandle); + pContext->sendInfo.fd = taosGetFdID(pConn->chandle); bool ret = rpcSendMsgToPeer(pConn, msg, msgLen); if (pConn->connType != RPC_CONN_TCPC) @@ -1424,7 +1442,7 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { rpcUnlockConn(pConn); - if(ret == BOOL_FALSE) { + if (ret == BOOL_FALSE) { // try next ip again pContext->code = terrno; // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query @@ -1436,31 +1454,29 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { } static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { - int writtenLen = 0; - SRpcHead *pHead = (SRpcHead *)msg; - bool ret = true; + int writtenLen = 0; + SRpcHead *pHead = (SRpcHead *)msg; + bool ret = true; msgLen = rpcAddAuthPart(pConn, msg, msgLen); - if ( rpcIsReq(pHead->msgType)) { - tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", - pConn->info, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort, - msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + if (rpcIsReq(pHead->msgType)) { + tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", pConn->info, taosMsg[pHead->msgType], + pConn->peerFqdn, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { - if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured - tDebug("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", - pConn->info, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort, - htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured + tDebug("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pConn->info, taosMsg[pHead->msgType], + pConn->peerIp, pConn->peerPort, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } - //tTrace("connection type is: %d", pConn->connType); + // tTrace("connection type is: %d", pConn->connType); writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); if (writtenLen != msgLen) { tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); ret = false; } - + tDump(msg, msgLen); return ret; } @@ -1468,24 +1484,26 @@ static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { static void rpcProcessConnError(void *param, void *id) { SRpcReqContext *pContext = (SRpcReqContext *)param; SRpcInfo *pRpc = pContext->pRpc; - SRpcMsg rpcMsg; - + SRpcMsg rpcMsg; + if (pRpc == NULL) { return; } - + if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) { - rpcMsg.msgType = pContext->msgType+1; + rpcMsg.msgType = pContext->msgType + 1; rpcMsg.ahandle = pContext->ahandle; rpcMsg.code = pContext->code; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; - tWarn("%s %p, connection error. notify client query over. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType); + tWarn("%s %p, connection error. notify client query over. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, + pContext->numOfTry, pContext->msgType); rpcNotifyClient(pContext, &rpcMsg); } else { // move to next IP - tWarn("%s %p, connection error. retry to send request again. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType); + tWarn("%s %p, connection error. retry to send request again. numOfTry=%d msgType=%d", pRpc->label, + pContext->ahandle, pContext->numOfTry, pContext->msgType); pContext->epSet.inUse++; pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps; rpcSendReqToServer(pRpc, pContext); @@ -1505,11 +1523,12 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { if (pConn->retry < 4) { tDebug("%s, re-send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); - rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); + rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); } else { // close the connection - tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); + tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, + pConn->peerPort); if (pConn->pContext) { pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pConn->pContext->pConn = NULL; @@ -1532,7 +1551,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { if (pConn->user[0]) { tDebug("%s, close the connection since no activity", pConn->info); - if (pConn->inType) rpcReportBrokenLinkToServer(pConn); + if (pConn->inType) rpcReportBrokenLinkToServer(pConn); rpcReleaseConn(pConn); } else { tDebug("%s, idle timer:%p not processed", pConn->info, tmrId); @@ -1558,34 +1577,34 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { rpcUnlockConn(pConn); } -static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { - SRpcHead *pHead = rpcHeadFromCont(pCont); - int32_t finalLen = 0; - int overhead = sizeof(SRpcComp); - +static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen) { + SRpcHead *pHead = rpcHeadFromCont(pCont); + int32_t finalLen = 0; + int overhead = sizeof(SRpcComp); + if (!NEEDTO_COMPRESSS_MSG(contLen)) { return contLen; } - - char *buf = malloc (contLen + overhead + 8); // 8 extra bytes + + char *buf = malloc(contLen + overhead + 8); // 8 extra bytes if (buf == NULL) { tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen); return contLen; } - + int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead); - + /* * only the compressed size is less than the value of contLen - overhead, the compression is applied * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message */ if (compLen > 0 && compLen < contLen - overhead) { SRpcComp *pComp = (SRpcComp *)pCont; - pComp->reserved = 0; - pComp->contLen = htonl(contLen); + pComp->reserved = 0; + pComp->contLen = htonl(contLen); memcpy(pCont + overhead, buf, compLen); - + pHead->comp = 1; tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen); finalLen = compLen + overhead; @@ -1598,29 +1617,29 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { } static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { - int overhead = sizeof(SRpcComp); - SRpcHead *pNewHead = NULL; - uint8_t *pCont = pHead->content; - SRpcComp *pComp = (SRpcComp *)pHead->content; + int overhead = sizeof(SRpcComp); + SRpcHead *pNewHead = NULL; + uint8_t *pCont = pHead->content; + SRpcComp *pComp = (SRpcComp *)pHead->content; if (pHead->comp) { // decompress the content assert(pComp->reserved == 0); int contLen = htonl(pComp->contLen); - + // prepare the temporary buffer to decompress message char *temp = (char *)malloc(contLen + RPC_MSG_OVERHEAD); - pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext - + pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext + if (pNewHead) { int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; - int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char *)pNewHead->content, compLen, contLen); + int origLen = LZ4_decompress_safe((char *)(pCont + overhead), (char *)pNewHead->content, compLen, contLen); assert(origLen == contLen); - + memcpy(pNewHead, pHead, sizeof(SRpcHead)); pNewHead->msgLen = rpcMsgLenFromCont(origLen); - rpcFreeMsg(pHead); // free the compressed message buffer - pHead = pNewHead; + rpcFreeMsg(pHead); // free the compressed message buffer + pHead = pNewHead; tTrace("decomp malloc mem:%p", temp); } else { tError("failed to allocate memory to decompress msg, contLen:%d", contLen); @@ -1632,7 +1651,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) { T_MD5_CTX context; - int ret = -1; + int ret = -1; tMD5Init(&context); tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); @@ -1680,25 +1699,25 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { SRpcHead *pHead = (SRpcHead *)msg; int code = 0; - if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){ - // secured link, or no authentication + if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) { + // secured link, or no authentication pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); // tTrace("%s, secured link, no auth is required", pConn->info); return 0; } - if ( !rpcIsReq(pHead->msgType) ) { + if (!rpcIsReq(pHead->msgType)) { // for response, if code is auth failure, it shall bypass the auth process code = htonl(pHead->code); if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || - code == TSDB_CODE_RPC_INVALID_VERSION || - code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_INVALID_USER || code == TSDB_CODE_RPC_NOT_READY) { + code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED || + code == TSDB_CODE_MND_INVALID_USER || code == TSDB_CODE_RPC_NOT_READY) { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); return 0; } } - + code = 0; if (pHead->spi == pConn->spi) { // authentication @@ -1711,12 +1730,12 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); code = TSDB_CODE_RPC_INVALID_TIME_STAMP; } else { - if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { + if (rpcAuthenticateMsg(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { tDebug("%s, authentication failed, msg discarded", pConn->info); code = TSDB_CODE_RPC_AUTH_FAILURE; } else { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); - if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client + if (!rpcIsReq(pHead->msgType)) pConn->secured = 1; // link is secured for client // tTrace("%s, message is authenticated", pConn->info); } } @@ -1745,13 +1764,9 @@ static void rpcUnlockConn(SRpcConn *pConn) { } } -static void rpcAddRef(SRpcInfo *pRpc) -{ - atomic_add_fetch_32(&pRpc->refCount, 1); -} +static void rpcAddRef(SRpcInfo *pRpc) { atomic_add_fetch_32(&pRpc->refCount, 1); } -static void rpcDecRef(SRpcInfo *pRpc) -{ +static void rpcDecRef(SRpcInfo *pRpc) { if (atomic_sub_fetch_32(&pRpc->refCount, 1) == 0) { rpcCloseConnCache(pRpc->pCache); taosHashCleanup(pRpc->hash); @@ -1768,23 +1783,23 @@ static void rpcDecRef(SRpcInfo *pRpc) } bool doRpcSendProbe(SRpcConn *pConn) { - char msg[RPC_MSG_OVERHEAD]; - SRpcHead *pHead; - int code = 0; + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pHead; + int code = 0; // set msg header memset(msg, 0, sizeof(SRpcHead)); pHead = (SRpcHead *)msg; - pHead->version = 1; - pHead->msgVer = htonl(tsVersion >> 8); - pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN; - pHead->spi = pConn->spi; - pHead->encrypt = 0; - pHead->tranId = (uint16_t)(taosRand() & 0xFFFF); // rand + pHead->version = 1; + pHead->msgVer = htonl(tsVersion >> 8); + pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN; + pHead->spi = pConn->spi; + pHead->encrypt = 0; + pHead->tranId = (uint16_t)(taosRand() & 0xFFFF); // rand pHead->sourceId = pConn->ownId; - pHead->destId = pConn->peerId; - pHead->linkUid = pConn->linkUid; - pHead->ahandle = (uint64_t)pConn->ahandle; + pHead->destId = pConn->peerId; + pHead->linkUid = pConn->linkUid; + pHead->ahandle = (uint64_t)pConn->ahandle; memcpy(pHead->user, pConn->user, tListLen(pHead->user)); pHead->code = htonl(code); @@ -1794,10 +1809,10 @@ bool doRpcSendProbe(SRpcConn *pConn) { } // send server syn -bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, bool *pReqOver) { +bool rpcSendProbe(int64_t rpcRid, void *pPrevContext, bool *pReqOver) { // return false can kill query bool ret = false; - if(rpcRid < 0) { + if (rpcRid < 0) { tError("PROBE rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid); return true; } @@ -1810,26 +1825,27 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, bool *pReqOver) { } // context same - if(pContext != pPrevContext) { + if (pContext != pPrevContext) { tError("PROBE rpcRid=0x%" PRIx64 " context diff. pContext=%p pPreContent=%p", rpcRid, pContext, pPrevContext); goto _END; } // conn same - if(pContext->pConn == NULL) { + if (pContext->pConn == NULL) { tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj is NULL. req is response and done.", rpcRid); - if(pReqOver) - *pReqOver = true; + if (pReqOver) *pReqOver = true; ret = true; goto _END; } else if (pContext->pConn != pContext->sendInfo.pConn) { - tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pContext->sendInfo.pConn); + tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, + pContext->sendInfo.pConn); goto _END; } // fdObj same if (pContext->pConn->chandle != pContext->sendInfo.pFdObj) { - tInfo("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pContext->sendInfo.pFdObj); + tInfo("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, + pContext->pConn->chandle, pContext->sendInfo.pFdObj); goto _END; } @@ -1853,11 +1869,11 @@ _END: } // after sql request send , save conn info -bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) { - if(rpcRid < 0) { +bool rpcSaveSendInfo(int64_t rpcRid, void **ppContext) { + if (rpcRid < 0) { tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid); return false; - } + } // get req content SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid); if (pContext == NULL) { @@ -1865,9 +1881,8 @@ bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) { return false; } - if (ppContext) - *ppContext = pContext; + if (ppContext) *ppContext = pContext; taosReleaseRef(tsRpcRefId, rpcRid); return true; -} \ No newline at end of file +}