diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 094c2972abe1c5372ad9a757f7a44bbbc1e4f9a2..d8f8d7bdfb22e43c7ba45dcdd76903e5b8ac54ae 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -35,37 +35,45 @@ extern "C" { extern int tsRpcHeadSize; typedef struct { - char *localIp; // local IP used - uint16_t localPort; // local port - char *label; // for debug purpose - int numOfThreads; // number of threads to handle connections - void *(*fp)(char type, void *pCont, int contLen, void *handle, int index); // function to process the incoming msg - int sessions; // number of sessions allowed - int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS - int idleTime; // milliseconds, 0 means idle timer is disabled - char *meterId; // meter ID - char spi; // security parameter index - char encrypt; // encrypt algorithm - char *secret; // key for authentication - char *ckey; // ciphering key - int (*afp) (char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // call back to retrieve auth info -} SRpcInit; - -typedef struct { - int16_t index; - int16_t numOfIps; + int8_t index; + int8_t numOfIps; uint16_t port; uint32_t ip[TSDB_MAX_MPEERS]; - char ipStr[TSDB_MAX_MPEERS][TSDB_IPv4ADDR_LEN]; } SRpcIpSet; +typedef struct { + char *localIp; // local IP used + uint16_t localPort; // local port + char *label; // for debug purpose + int numOfThreads; // number of threads to handle connections + int sessions; // number of sessions allowed + int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS + int idleTime; // milliseconds, 0 means idle timer is disabled + + // the following is for client security only + char *meterId; // meter ID + char spi; // security parameter index + char encrypt; // encrypt algorithm + char *secret; // key for authentication + char *ckey; // ciphering key + + // call back to process incoming msg + void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); + + // call back to process notify the ipSet changes + void (*ufp)(void *ahandle, SRpcIpSet ipSet); + + // call back to retrieve the client auth info + int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); +} SRpcInit; + void *rpcOpen(SRpcInit *pRpc); void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle); -void rpcSendResponse(void *pConn, void *pCont, int contLen); -void rpcSendSimpleRsp(void *pConn, int32_t code); +void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen); +void rpcSendRedirectRsp(void *pConn, SRpcIpSet ipSet); #ifdef __cplusplus diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index dd881f069278938b8fa539114548354a93d98300..bb2c466324c73871deb4685a06c957d95ae35e58 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -33,15 +33,15 @@ #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest)) #define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader))) -#define rpcContFromHeader(msg) ( msg + sizeof(SRpcHeader)) -#define rpcMsgLenFromCont(contLen) ( contLen + sizeof(SRpcHeader)) +#define rpcContFromHeader(msg) (msg + sizeof(SRpcHeader)) +#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHeader)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHeader)) #define rpcIsReq(type) (type & 1U) typedef struct { int sessions; int numOfThreads; - int idleTime; // milliseconds; + int idleTime; // milliseconds; char localIp[TSDB_IPv4ADDR_LEN]; uint16_t localPort; int connType; @@ -53,27 +53,29 @@ typedef struct { char *secret; // key for authentication char *ckey; // ciphering key - void *(*fp)(char type, void *pCont, int contLen, void *handle, int index); - int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // FP to retrieve auth info - struct _RpcConn *connList; - void *idPool; - void *tmrCtrl; - void *hash; + void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); + int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); + void (*ufp)(void *ahandle, SRpcIpSet ipSet); + + void *idPool; // handle to ID pool + void *tmrCtrl; // handle to timer + void *hash; // handle returned by hash utility void *shandle; // returned handle from lower layer during initialization void *pCache; // connection cache - pthread_mutex_t mutex; + pthread_mutex_t mutex; + struct _RpcConn *connList; // connection list } SRpcInfo; typedef struct { SRpcIpSet ipSet; - void *ahandle; - SRpcInfo *pRpc; - char msgType; - char *pCont; - int contLen; - int numOfRetry; - int32_t code; - char msg[]; + void *ahandle; // handle provided by app + SRpcInfo *pRpc; // associated SRpcInfo + char msgType; // message type + char *pCont; // content provided by app + int contLen; // content length + int numOfRetry; // number of retry for different servers + int32_t code; // error code + char msg[0]; // RpcHeader starts from here } SRpcReqContext; typedef struct _RpcConn { @@ -124,6 +126,7 @@ typedef struct { char empty[1]; // reserved uint8_t msgType; // message type int32_t msgLen; // message length including the header iteslf + int32_t code; uint8_t content[0]; // message body starts from here } SRpcHeader; @@ -174,25 +177,25 @@ void (*taosCloseConn[])(void *chandle) = { }; static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort); -static void rpcCloseConn(void *thandle); -static SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet); +static void rpcCloseConn(void *thandle); +static SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet); static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr); -static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext); +static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, char code); static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle); -static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen); +static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); -static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle); +static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); static void rpcProcessProgressTimer(void *param, void *tmrId); -static void rpcFreeMsg(void *msg); +static void rpcFreeOutMsg(void *msg); static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader); static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); @@ -208,7 +211,6 @@ void *rpcOpen(SRpcInit *pInit) { if (pRpc == NULL) return NULL; strcpy(pRpc->label, pInit->label); - pRpc->fp = pInit->fp; pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->numOfThreads = pInit->numOfThreads; @@ -224,10 +226,12 @@ void *rpcOpen(SRpcInit *pInit) { pRpc->spi = pInit->spi; strcpy(pRpc->secret, pInit->secret); strcpy(pRpc->ckey, pInit->ckey); + pRpc->ufp = pInit->ufp; + pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; pRpc->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, - pRpc->numOfThreads, rpcProcessDataFromPeer, pRpc); + pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); if (pRpc->shandle == NULL) { tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort); rpcClose(pRpc); @@ -318,7 +322,6 @@ void rpcFreeCont(void *cont) { void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int contLen, void *ahandle) { SRpcInfo *pRpc = (SRpcInfo *)shandle; - SRpcConn *pConn; SRpcReqContext *pContext; contLen = rpcCompressRpcMsg(pCont, contLen); @@ -330,22 +333,23 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int pContext->pCont = pCont; pContext->msgType = type; - pConn = rpcSetConnToServer(shandle, ipSet); - pContext->code = terrno; - if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); - - rpcSendReqToServer(pConn, pContext); + rpcSendReqToServer(pRpc, pContext); return; } -void rpcSendResponse(void *handle, void *pCont, int contLen) { +void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { int msgLen = 0; SRpcConn *pConn = (SRpcConn *)handle; SRpcInfo *pRpc = pConn->pRpc; SRpcHeader *pHeader = rpcHeaderFromCont(pCont); char *msg = (char *)pHeader; + if ( pCont == NULL ) { + pCont = rpcMallocCont(0); + contLen = 0; + } + contLen = rpcCompressRpcMsg(pCont, contLen); msgLen = rpcMsgLenFromCont(contLen); @@ -367,13 +371,14 @@ void rpcSendResponse(void *handle, void *pCont, int contLen) { pHeader->sourceId = pConn->ownId; pHeader->destId = pConn->peerId; pHeader->uid = 0; + pHeader->code = htonl(code); memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); // set pConn parameters pConn->inType = 0; // response message is released until new response is sent - rpcFreeMsg(pConn->pRspMsg); + rpcFreeOutMsg(pConn->pRspMsg); pConn->pRspMsg = msg; pConn->rspMsgLen = msgLen; if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; @@ -381,28 +386,21 @@ void rpcSendResponse(void *handle, void *pCont, int contLen) { pthread_mutex_unlock(&pRpc->mutex); taosTmrStopA(&pConn->pTimer); - rpcSendDataToPeer(pConn, msg, msgLen); + rpcSendMsgToPeer(pConn, msg, msgLen); return; } -void rpcSendSimpleRsp(void *thandle, int32_t code) { +void rpcSendRedirectRsp(void *thandle, SRpcIpSet ipSet) { char *pMsg; - STaosRsp *pRsp; - int msgLen = sizeof(STaosRsp); - - if (thandle == NULL) { - tError("connection is gone, response could not be sent"); - return; - } + int msgLen = sizeof(SRpcIpSet); pMsg = rpcMallocCont(msgLen); if (pMsg == NULL) return; - pRsp = (STaosRsp *)pMsg; - pRsp->code = htonl(code); + memcpy(pMsg, &ipSet, sizeof(ipSet)); - rpcSendResponse(thandle, pMsg, msgLen); + rpcSendResponse(thandle, TSDB_CODE_REDIRECT, pMsg, msgLen); return; } @@ -442,34 +440,33 @@ static void rpcCloseConn(void *thandle) { pthread_mutex_lock(&pRpc->mutex); - if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle); + if (pConn->meterId[0]) { + pConn->meterId[0] = 0; + if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle); + + taosTmrStopA(&pConn->pTimer); + taosTmrStopA(&pConn->pIdleTimer); + + if ( pRpc->connType == TAOS_CONN_UDPS || TAOS_CONN_TCPS) { + char hashstr[40] = {0}; + sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId); + taosDeleteStrHash(pRpc->hash, hashstr); + rpcFreeOutMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg + pConn->pRspMsg = NULL; + pConn->inType = 0; + pConn->inTranId = 0; + } else { + pConn->outType = 0; + pConn->outTranId = 0; + pConn->pReqMsg = NULL; + } - taosTmrStopA(&pConn->pTimer); - taosTmrStopA(&pConn->pIdleTimer); + taosFreeId(pRpc->idPool, pConn->sid); + pConn->pContext = NULL; - if ( pRpc->connType == TAOS_CONN_UDPS || TAOS_CONN_TCPS) { - char hashstr[40] = {0}; - sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId); - taosDeleteStrHash(pRpc->hash, hashstr); - rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg + tTrace("%s pConn:%p, rpc connection is closed", pRpc->label, pConn); } - taosFreeId(pRpc->idPool, pConn->sid); - - // reset the link parameters - pConn->meterId[0] = 0; - pConn->outType = 0; - pConn->inType = 0; - pConn->inTranId = 0; - pConn->outTranId = 0; - pConn->pReqMsg = NULL; - pConn->reqMsgLen = 0; - pConn->pRspMsg = NULL; - pConn->rspMsgLen = 0; - pConn->pContext = NULL; - - tTrace("%s pConn:%p, rpc connection is closed", pRpc->label, pConn); - pthread_mutex_unlock(&pRpc->mutex); } @@ -553,13 +550,14 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *has return pConn; } -SRpcConn *rpcSetConnToServer(void *shandle, SRpcIpSet ipSet) { - SRpcInfo *pRpc = (SRpcInfo *)shandle; - - SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId); +SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) { + SRpcConn *pConn; + pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId); if ( pConn == NULL ) { - pConn = rpcOpenConn(pRpc, ipSet.ipStr[ipSet.index], ipSet.port); + char ipstr[20] = {0}; + tinet_ntoa(ipstr, ipSet.ip[ipSet.index]); + pConn = rpcOpenConn(pRpc, ipstr, ipSet.port); } return pConn; @@ -585,7 +583,7 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { } else if (pConn->inType == 0) { tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->inTranId); - rpcSendDataToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response + rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response } else { tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHeader->msgType]); } @@ -658,13 +656,14 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d SRpcHeader *pHeader = (SRpcHeader *)data; sid = htonl(pHeader->destId); + pHeader->code = htonl(pHeader->code); + pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen); if (pHeader->msgType >= TSDB_MSG_TYPE_MAX || pHeader->msgType <= 0) { tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHeader->msgType); return TSDB_CODE_INVALID_MSG_TYPE; } - pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen); if (dataLen != pHeader->msgLen) { tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid, taosMsg[pHeader->msgType], dataLen, pHeader->msgLen); @@ -708,7 +707,7 @@ static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int d return code; } -static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) { +static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) { SRpcHeader *pHeader = (SRpcHeader *)data; SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcConn *pConn = NULL; @@ -764,42 +763,33 @@ static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16 static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { SRpcInfo *pRpc = pConn->pRpc; - int msgLen = rpcContFromHeader(pHeader->msgLen); - pHeader = rpcDecompressRpcMsg(pHeader); + int contLen = rpcContLenFromMsg(pHeader->msgLen); + uint8_t *pCont = pHeader->content; if ( rpcIsReq(pHeader->msgType) ) { taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); - (*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pConn, 0); + (*(pRpc->cfp))(pHeader->msgType, pCont, contLen, pConn, 0); } else { // it's a response - STaosRsp *pRsp = (STaosRsp *)pHeader->content; - int32_t code = htonl(pRsp->code); - + int32_t code = pHeader->code; SRpcReqContext *pContext = pConn->pContext; pConn->pContext = NULL; - taosAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId); - if (code == TSDB_CODE_NO_MASTER) { - pContext->code = code; - taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); + if (code == TSDB_CODE_REDIRECT) { + memcpy(&pContext->ipSet, pHeader->content, sizeof(pContext->ipSet)); + rpcSendReqToServer(pRpc, pContext); } else { - rpcFreeMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg - (*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pContext->ahandle, pContext->ipSet.index); + rpcFreeOutMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg + (*(pRpc->cfp))(pHeader->msgType, pCont, contLen, pContext->ahandle, pContext->ipSet.index); } } } static void rpcSendQuickRsp(SRpcConn *pConn, char code) { - char msg[RPC_MSG_OVERHEAD + sizeof(STaosRsp)]; + char msg[RPC_MSG_OVERHEAD]; SRpcHeader *pHeader; - int msgLen; - STaosRsp *pRsp; - - pRsp = (STaosRsp *)rpcContFromHeader(msg); - pRsp->code = htonl(code); - msgLen = sizeof(STaosRsp); // set msg header memset(msg, 0, sizeof(SRpcHeader)); @@ -814,14 +804,14 @@ static void rpcSendQuickRsp(SRpcConn *pConn, char code) { pHeader->destId = pConn->peerId; pHeader->uid = 0; memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); + pHeader->code = htonl(code); - rpcSendDataToPeer(pConn, msg, msgLen); + rpcSendMsgToPeer(pConn, msg, 0); } static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) { SRpcHeader *pRecvHeader, *pReplyHeader; - char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(STaosRsp)]; - STaosRsp *pRsp; + char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(uint32_t) ]; uint32_t timeStamp; int msgLen; @@ -839,13 +829,12 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint pReplyHeader->destId = pRecvHeader->sourceId; memcpy(pReplyHeader->meterId, pRecvHeader->meterId, tListLen(pReplyHeader->meterId)); - pRsp = (STaosRsp *)pReplyHeader->content; - pRsp->code = htonl(code); - msgLen = sizeof(STaosRsp); - char *pContent = pRsp->more; + pReplyHeader->code = htonl(code); + msgLen = sizeof(SRpcHeader); if (code == TSDB_CODE_INVALID_TIME_STAMP) { // include a time stamp if client's time is not synchronized well + uint8_t *pContent = pReplyHeader->content; timeStamp = taosGetTimestampSec(); memcpy(pContent, &timeStamp, sizeof(timeStamp)); msgLen += sizeof(timeStamp); @@ -857,13 +846,19 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint return; } -static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext) { +static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { SRpcHeader *pHeader = rpcHeaderFromCont(pContext->pCont); - SRpcInfo *pRpc = pConn->pRpc; char *msg = (char *)pHeader; int msgLen = rpcMsgLenFromCont(pContext->contLen); char msgType = pContext->msgType; + SRpcConn *pConn = rpcSetConnToServer(pRpc, pContext->ipSet); + if (pConn == NULL) { + pContext->code = terrno; + taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); + return; + } + pthread_mutex_lock(&pRpc->mutex); // set the message header @@ -889,37 +884,37 @@ static void rpcSendReqToServer(SRpcConn *pConn, SRpcReqContext *pContext) { pthread_mutex_unlock(&pRpc->mutex); - rpcSendDataToPeer(pConn, msg, msgLen); + rpcSendMsgToPeer(pConn, msg, msgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); } - -static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen) { + +static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { int writtenLen = 0; SRpcInfo *pRpc = pConn->pRpc; - SRpcHeader *pHeader = (SRpcHeader *)data; + SRpcHeader *pHeader = (SRpcHeader *)msg; - dataLen = rpcAddAuthPart(pConn, data, dataLen); + msgLen = rpcAddAuthPart(pConn, msg, msgLen); if ( rpcIsReq(pHeader->msgType)) { if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d", pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, - pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); + pConn->peerPort, msgLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); } else { if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pConn->peerPort, - (uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); + (uint8_t)pHeader->content[0], msgLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); } - writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle); + writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHeader, msgLen, pConn->chandle); - if (writtenLen != dataLen) { + if (writtenLen != msgLen) { tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, - dataLen, writtenLen, strerror(errno)); + msgLen, writtenLen, strerror(errno)); } - tDump(data, dataLen); + tDump(msg, msgLen); } static void rpcProcessConnError(void *param, void *id) { @@ -927,34 +922,20 @@ static void rpcProcessConnError(void *param, void *id) { SRpcInfo *pRpc = pContext->pRpc; if ( pContext->numOfRetry >= pContext->ipSet.numOfIps ) { - rpcFreeMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg - char *rsp = calloc(1, RPC_MSG_OVERHEAD + sizeof(STaosRsp)); - if ( rsp ) { - STaosRsp *pRsp = (STaosRsp *)(rsp+sizeof(SRpcHeader)); - pRsp->code = pContext->code; - (*(pRpc->fp))(pContext->msgType+1, pRsp, sizeof(STaosRsp), pContext->ahandle, 0); - } else { - tError("%s failed to malloc RSP", pRpc->label); - } + rpcFreeOutMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg + (*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code); } else { // move to next IP pContext->ipSet.index++; pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps; - - SRpcConn *pConn = rpcSetConnToServer(pContext->pRpc, pContext->ipSet); - pContext->code = terrno; - if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); - - rpcSendReqToServer(pConn, pContext); + rpcSendReqToServer(pRpc, pContext); } } static void rpcProcessRetryTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; - int reportDisc = 0; - SRpcInfo *pRpc = pConn->pRpc; - if (pRpc == NULL) return; // it means it is already released + int reportDisc = 0; pthread_mutex_lock(&pRpc->mutex); @@ -966,7 +947,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { if (pConn->retry < 4) { tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); - rpcSendDataToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); + rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer<retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { // close the connection @@ -990,18 +971,13 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { static void rpcProcessIdleTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; SRpcInfo *pRpc = pConn->pRpc; - assert(pRpc); - pthread_mutex_lock(&pRpc->mutex); - - if (pConn->inType == 0 && pConn->meterId[0]) { + if (pConn->meterId[0]) { tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn); rpcCloseConn(pConn); } else { tTrace("%s pConn:%p, idle timer:%p not processed", pRpc->label, pConn, tmrId); } - - pthread_mutex_unlock(&pRpc->mutex); } static void rpcProcessProgressTimer(void *param, void *tmrId) { @@ -1021,22 +997,27 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { pthread_mutex_unlock(&pRpc->mutex); } -static void rpcFreeMsg(void *msg) { +static void rpcFreeOutMsg(void *msg) { if ( msg == NULL ) return; char *req = ((char *)msg) - sizeof(SRpcReqContext); free(req); } +typedef struct { + int32_t reserved; + int32_t contLen; +} SRpcComp; + static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { SRpcHeader *pHeader = rpcHeaderFromCont(pCont); - int32_t overhead = sizeof(int32_t) * 2; int32_t finalLen = 0; + int overhead = sizeof(SRpcComp); if (!NEEDTO_COMPRESSS_MSG(contLen)) { return contLen; } - char *buf = malloc (contLen + overhead+8); // 16 extra bytes + char *buf = malloc (contLen + overhead + 8); // 8 extra bytes if (buf == NULL) { tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno)); return contLen; @@ -1049,20 +1030,15 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message */ if (compLen < contLen - overhead) { - //tDump(pCont, contLen); - int32_t *pLen = (int32_t *)pCont; - - *pLen = 0; // first 4 bytes must be zero - pLen = (int32_t *)(pCont + sizeof(int32_t)); - - *pLen = htonl(contLen); // contLen is encoded in second 4 bytes + SRpcComp *pComp = (SRpcComp *)pCont; + pComp->reserved = 0; + pComp->contLen = htonl(contLen); memcpy(pCont + overhead, buf, compLen); pHeader->comp = 1; tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); finalLen = compLen + overhead; - //tDump(pCont, contLen); } else { finalLen = contLen; } @@ -1072,16 +1048,15 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { } static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) { - int overhead = sizeof(int32_t) * 2; + int overhead = sizeof(SRpcComp); SRpcHeader *pNewHeader = NULL; uint8_t *pCont = pHeader->content; + SRpcComp *pComp = (SRpcComp *)pHeader->content; if (pHeader->comp) { // decompress the content - assert(GET_INT32_VAL(pHeader->content) == 0); - - // contLen is original message length before compression applied - int contLen = htonl(GET_INT32_VAL(pCont + sizeof(int32_t))); + assert(pComp->reserved == 0); + int contLen = htonl(pComp->contLen); // prepare the temporary buffer to decompress message char *buf = rpcMallocCont(contLen);