diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 6878d9f3b15bc74b07f2fa7f82ff868b878bd2e1..3a0ea69246e5fa8ea30e5b4eda08b177dda375e9 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -43,7 +43,7 @@ typedef struct { uint16_t localPort; // local port char *label; // for debug purpose int numOfThreads; // number of threads to handle connections - void *(*fp)(char type, char *pCont, int contLen, void *handle, int index); // function to process the incoming msg + void *(*fp)(char type, void *pCont, int contLen, void *handle, int index); // function to process the incoming msg int sessions; // number of sessions allowed int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int idleTime; // milliseconds, 0 means idle timer is disabled @@ -58,13 +58,14 @@ typedef struct { typedef struct { int16_t index; int16_t numOfIps; - uint32_t ip[TSDB_MAX_REPLICA]; + uint16_t port; + char ipStr[TSDB_MAX_MPEERS][40]; } SRpcIpSet; void *rpcOpen(SRpcInit *pRpc); void rpcClose(void *); -char *rpcMallocCont(int contLen); -void rpcFreeCont(char *pCont); +void *rpcMallocCont(int contLen); +void rpcFreeCont(void *pCont); void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle); void rpcSendResponse(void *pConn, void *pCont, int contLen); void rpcSendSimpleRsp(void *pConn, int32_t code); diff --git a/src/rpc/src/tcache.c b/src/rpc/src/tcache.c index 666d069a58c936e9028b46f9e6244923ac4be993..d8f9c6e5ecd3ca2a57154dc32fb507aef9ad44d5 100644 --- a/src/rpc/src/tcache.c +++ b/src/rpc/src/tcache.c @@ -18,10 +18,10 @@ #include "tglobalcfg.h" #include "tlog.h" #include "tmempool.h" -#include "tsclient.h" #include "ttime.h" #include "ttimer.h" #include "tutil.h" +#include "tcache.h" typedef struct _c_hash_t { uint32_t ip; diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index a4a93f14ea9e19b24674238eef1687baa91c697a..77438e7399cfb298517eec54269a275cfa8ac612 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -14,6 +14,7 @@ */ #include "os.h" +#include "tcache.h" #include "shash.h" #include "taosmsg.h" #include "tidpool.h" @@ -21,7 +22,6 @@ #include "tmd5.h" #include "tmempool.h" #include "trpc.h" -#include "tsdb.h" #include "tsocket.h" #include "ttcpclient.h" #include "ttcpserver.h" @@ -32,7 +32,7 @@ #include "lz4.h" #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest)) -#define rpcHeaderFromCont(cont) ((STaosHeader *) (cont - sizeof(SRpcHeader))) +#define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader))) #define rpcContFromHeader(msg) ( msg + sizeof(SRpcHeader)) #define rpcMsgLenFromCont(contLen) ( contLen + sizeof(SRpcHeader)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHeader)) @@ -43,7 +43,9 @@ typedef struct { int numOfThreads; int type; int idleTime; // milliseconds; + uint32_t localIp; uint16_t localPort; + int connType; char label[12]; char *meterId; // meter ID @@ -52,9 +54,9 @@ typedef struct { char *secret; // key for authentication char *ckey; // ciphering key - void *(*fp)(char *, void *ahandle, void *thandle); // FP to call the application + void *(*fp)(char type, void *pCont, int contLen, void *handle, int index); int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // FP to retrieve auth info - SRpcConn *connList; + struct _RpcConn *connList; void *idPool; void *tmrCtrl; void *hash; @@ -64,6 +66,18 @@ typedef struct { } SRpcInfo; typedef struct { + SRpcIpSet ipSet; + void *ahandle; + SRpcInfo *pRpc; + char msgType; + char *pCont; + int contLen; + int numOfRetry; + int32_t code; + char msg[]; +} SRpcReqContext; + +typedef struct _RpcConn { void *signature; int sid; // session ID uint32_t ownId; // own link ID @@ -94,19 +108,9 @@ typedef struct { char *pReqMsg; // including header int reqMsgLen; SRpcInfo *pRpc; + SRpcReqContext *pContext; } SRpcConn; -typedef struct { - SRpcIpSet ipSet; - void *ahandle; - SRpcInfo *pRpc; - char type; - char *pCont; - int contLen; - int numOfRetry; - char msg[]; -} SRpcReqContext; - typedef struct { char version : 4; char comp : 4; @@ -173,15 +177,18 @@ void (*taosCloseConn[])(void *chandle) = { taosCloseTcpClientConnection }; -int rpcResendRspToPeer(SRpcConn *pConn); -void rpcProcessRetryTimer(void *, void *); -void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle); -int rpcSendDataToPeer(SRpcConn *pConn, char *data, int dataLen); -int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); -int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); +static void rpcProcessRetryTimer(void *, void *); +static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle); +static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen); +static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); +static int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); +static void rpcCloseConn(void *thandle); +static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr); +static void rpcProcessConnError(void *param, void *id); +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader); static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { - SRpcHeader* pHeader = rpcHeaderFromCont(pCont); + SRpcHeader *pHeader = rpcHeaderFromCont(pCont); int32_t overhead = sizeof(int32_t) * 2; int32_t finalLen = 0; @@ -227,7 +234,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) { int overhead = sizeof(int32_t) * 2; SRpcHeader *pNewHeader = NULL; - char *pCont = pHeader->content; + uint8_t *pCont = pHeader->content; if (pHeader->comp) { // decompress the content @@ -257,7 +264,7 @@ static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) { return pHeader; } -char *rpcMallocCont(int size) { +void *rpcMallocCont(int size) { char *pMsg = NULL; size += RPC_MSG_OVERHEAD; @@ -270,17 +277,17 @@ char *rpcMallocCont(int size) { return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHeader); } -void rpcFreeCont(char *cont) { - char *msg = cont - sizeof(SRpcHeader); +void rpcFreeCont(void *cont) { + char *msg = ((char *)cont) - sizeof(SRpcHeader); free(msg); } -static void rpcFreeMsg(char *msg) { - msg -= sizeof(SRpcReqContext); - free(msg); +static void rpcFreeMsg(void *msg) { + char *req = ((char *)msg) - sizeof(SRpcReqContext); + free(req); } -void rpcSendSimpleRsp(void *thandle, int_32 code) { +void rpcSendSimpleRsp(void *thandle, int32_t code) { char *pMsg; STaosRsp *pRsp; int msgLen = sizeof(STaosRsp); @@ -296,7 +303,7 @@ void rpcSendSimpleRsp(void *thandle, int_32 code) { pRsp = (STaosRsp *)pMsg; pRsp->code = htonl(code); - taosSendResponse(thandle, pMsg, msgLen); + rpcSendResponse(thandle, pMsg, msgLen); return; } @@ -305,6 +312,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, char code) { char msg[RPC_MSG_OVERHEAD + sizeof(STaosRsp)]; SRpcHeader *pHeader; int msgLen; + STaosRsp *pRsp; pRsp = (STaosRsp *)rpcContFromHeader(msg); pRsp->code = htonl(code); @@ -324,7 +332,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, char code) { pHeader->uid = 0; memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); - rpcSendDataToPeer(pConn, (char *)msg, msgLen); + rpcSendDataToPeer(pConn, msg, msgLen); } void *rpcOpen(SRpcInit *pInit) { @@ -347,54 +355,54 @@ void *rpcOpen(SRpcInit *pInit) { pRpc->localPort = pInit->localPort; pRpc->afp = pInit->afp; - pRpc->sessions = pInit->session; + pRpc->sessions = pInit->sessions; strcpy(pRpc->meterId, pInit->meterId); pRpc->spi = pInit->spi; strcpy(pRpc->secret, pInit->secret); strcpy(pRpc->ckey, pInit->ckey); pRpc->afp = pInit->afp; - pRpc->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->numOfThreads, - taosProcessDataFromPeer, pRpc); + pRpc->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, + pRpc->numOfThreads, rpcProcessDataFromPeer, pRpc); if (pRpc->shandle == NULL) { tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort); - taosCloseRpc(pRpc); + rpcClose(pRpc); return NULL; } - size_t size = sizeof(SRpcConn) * sessions; + size_t size = sizeof(SRpcConn) * pRpc->sessions; pRpc->connList = (SRpcConn *)calloc(1, size); if (pRpc->connList == NULL) { tError("%s failed to allocate memory for taos connections, size:%d", pRpc->label, size); - taosCloseRpc(pRpc); + rpcClose(pRpc); return NULL; } - pRpc->idPool = taosInitIdPool(sessions); + pRpc->idPool = taosInitIdPool(pRpc->sessions); if (pRpc->idPool == NULL) { tError("%s failed to init ID pool", pRpc->label); - taosCloseRpc(pRpc); + rpcClose(pRpc); return NULL; } - pRpc->tmrCtrl = taosTmrInit(sessions * 2 + 1, 50, 10000, pRpc->label); + pRpc->tmrCtrl = taosTmrInit(pRpc->sessions*2 + 1, 50, 10000, pRpc->label); if (pRpc->tmrCtrl == NULL) { tError("%s failed to init timers", pRpc->label); - taosCloseRpc(pRpc); + rpcClose(pRpc); return NULL; } - pRpc->hash = taosInitStrHash(sessions, sizeof(pRpc), taosHashString); + pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); if (pRpc->hash == NULL) { tError("%s failed to init string hash", pRpc->label); - taosCloseRpc(pRpc); + rpcClose(pRpc); return NULL; } - pRpc->pCahche = taosOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000); + pRpc->pCache = taosOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000); if ( pRpc->pCache == NULL ) { tError("%s failed to init connection cache", pRpc->label); - taosCloseRpc(pRpc); + rpcClose(pRpc); return NULL; } @@ -411,8 +419,8 @@ void rpcClose(void *param) { (*taosCleanUpConn[pRpc->type])(pRpc->shandle); for (int i = 0; i < pRpc->sessions; ++i) { - if (pRpc->connList[i].signature != NULL) { - taosCloseRpcConn((void *)(pRpc->connList + i)); + if (pRpc->connList[i].meterId[0]) { + rpcCloseConn((void *)(pRpc->connList + i)); } } @@ -426,32 +434,30 @@ void rpcClose(void *param) { tfree(pRpc); } -static SRpcConn *rpcOpenConn(SRpcConnInit *pInit) { +static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) { SRpcConn *pConn; - SRpcInfo *pRpc = (SRpcInfo *)pInit->shandle; - if ( (uint8_t)(rpcGetConn(pInit->sid, pInit->meterId, pRpc, &pConn, 1, NULL)) != 0 ) + if ( (uint8_t)(rpcGetConn(0, pRpc->meterId, pRpc, &pConn, 1, NULL)) != 0 ) return NULL; - if (pConn->peerId == 0) pConn->peerId = pRpc->peerId; - strcpy(pConn->peerIpstr, pInit->peerIp); - pConn->peerIp = inet_addr(pInit->peerIp); - pConn->peerPort = pInit->peerPort; - pConn->ahandle = pInit->ahandle; + strcpy(pConn->peerIpstr, peerIpStr); + pConn->peerIp = inet_addr(peerIpStr); + pConn->peerPort = peerPort; pConn->spi = pRpc->spi; pConn->encrypt = pRpc->encrypt; if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); + strcpy(pConn->meterId, pRpc->meterId); // if it is client, it shall set up connection first if (taosOpenConn[pRpc->type]) { pConn->chandle = (*taosOpenConn[pRpc->type])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort); if (pConn->chandle) { tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label, - pConn, pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort); + pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort); } else { tError("%s pConn:%p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn, - pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort); - terrorno = TSDB_CODE_NETWORK_UNAVAIL; + pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort); + terrno = TSDB_CODE_NETWORK_UNAVAIL; rpcCloseConn(pConn); pConn = NULL; } @@ -462,32 +468,28 @@ static SRpcConn *rpcOpenConn(SRpcConnInit *pInit) { static void rpcCloseConn(void *thandle) { SRpcConn *pConn = (SRpcConn *)thandle; - if (pConn == NULL) return; + assert(pConn); SRpcInfo *pRpc = pConn->pRpc; - if (pConn->signature != thandle || pRpc == NULL) return; + assert(pRpc); pthread_mutex_lock(&pRpc->mutex); - pConn->signature = NULL; - if (taosCloseConn[pRpc->type]) (*taosCloseConn[pRpc->type])(pConn->chandle); taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pIdleTimer); rpcFreeMsg(pConn->pRspMsg); - rpcFreeMsg(pConn-pReqMsg); + rpcFreeMsg(pConn->pReqMsg); char hashstr[40] = {0}; sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId); taosDeleteStrHash(pRpc->hash, hashstr); - tTrace("%s pConn:%p, TAOS connection closed", pRpc->label, pConn->sid, - pConn->meterId, pConn); - int freeId = pConn->sid; - memset(pConn, 0, sizeof(SRpcConn)); + if (pRpc->idPool) taosFreeId(pRpc->idPool, pConn->sid); - if (pRpc->idPool) taosFreeId(pRpc->idPool, freeId); + tTrace("%s pConn:%p, TAOS connection closed", pRpc->label, pConn); + memset(pConn, 0, sizeof(SRpcConn)); pthread_mutex_unlock(&pRpc->mutex); } @@ -495,8 +497,6 @@ static void rpcCloseConn(void *thandle) { static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr) { SRpcConn * pConn = NULL; - if (pRpc == NULL) return -1; - if (sid == 0) { if (req) { int osid = sid; @@ -513,14 +513,13 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, } } else { sid = pConn->sid; - tTrace("%s sid:%d id:%s, session is already there", pRpc->label, pConn->sid, - pConn->meterId); + tTrace("%s sid:%d id:%s, session is already there", pRpc->label, pConn->sid, pConn->meterId); } } else { return TSDB_CODE_UNEXPECTED_RESPONSE; } } else { - if (pRpc->connList[sid].signature == NULL) { + if (pRpc->connList[sid].meterId[0] == 0) { tError("%s sid:%d session is already released", pRpc->label, sid); return TSDB_CODE_INVALID_VALUE; } @@ -528,9 +527,8 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, pConn = pRpc->connList + sid; - if (pConn->signature == NULL) { + if (pConn->meterId[0] == 0) { memset(pConn, 0, sizeof(SRpcConn)); - pConn->signature = pConn; memcpy(pConn->meterId, meterId, tListLen(pConn->meterId)); pConn->pRpc = pRpc; pConn->sid = sid; @@ -546,13 +544,6 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, } } - if ((pRpc->type == TAOS_CONN_UDPC || pRpc->type == TAOS_CONN_UDPS) && pRpc->numOfThreads > 1 && - pRpc->localPort) { - // UDP server, assign to new connection - pRpc->index = (pRpc->index + 1) % pRpc->numOfThreads; - pConn->localPort = (int16_t)(pRpc->localPort + pRpc->index); - } - taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); tTrace("%s pConn:%p, TAOS connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid); } else { @@ -569,8 +560,8 @@ static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { SRpcHeader *pHeader = (SRpcHeader *)msg; - SRpcInfo *pRpc = pConn->pRpc; - int code = 0; + SRpcInfo *pRpc = pConn->pRpc; + int code = 0; if (pConn->spi == 0 ) return 0; @@ -586,10 +577,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { delta, htonl(pDigest->timeStamp)); code = TSDB_CODE_INVALID_TIME_STAMP; } else { - if (rpcAuthenticateMsg((uint8_t *)pHeader, dataLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { - char ipstr[24]; - tinet_ntoa(ipstr, ip); - mLError("id:%s from %s, authentication failed", pHeader->meterId, ipstr); + if (rpcAuthenticateMsg((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn); code = TSDB_CODE_AUTH_FAILURE; } else { @@ -608,7 +596,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { } static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { - int code = 0; + SRpcInfo *pRpc= pConn->pRpc; if (pConn->peerId == 0) { pConn->peerId = pHeader->sourceId; @@ -627,7 +615,7 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { } else if (pConn->inType == 0) { tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->inTranId); - rpcResendRspToPeer(pConn); + rpcSendDataToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response } else { tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHeader->msgType]); } @@ -637,8 +625,8 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { } if (pConn->inType != 0) { - tTrace("%s pConn:%p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn, - pConn->inTranId, pHeader->tranId); + tTrace("%s pConn:%p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn, + pConn->inTranId, pHeader->tranId); return TSDB_CODE_LAST_SESSION_NOT_FINISHED; } @@ -649,6 +637,7 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { } static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) { + SRpcInfo *pRpc = pConn->pRpc; pConn->peerId = pHeader->sourceId; if (pConn->outType == 0 || pConn->pContext == NULL) { @@ -664,7 +653,7 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) { } if (*pHeader->content == TSDB_CODE_NOT_READY) { - return = TSDB_CODE_ALREADY_PROCESSED; + return TSDB_CODE_ALREADY_PROCESSED; } taosTmrStopA(&pConn->pTimer); @@ -685,18 +674,18 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) { pConn->tretry = 0; pConn->outType = 0; pConn->pReqMsg = NULL; - pConn->pReqMsgLen = 0; + pConn->reqMsgLen = 0; } -static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pRpc, int dataLen, uint32_t ip, - uint16_t port, void *chandle) { - int sid, code = 0; +static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) { + int32_t sid, code = 0; SRpcConn * pConn = NULL; - int msgLen; char hashstr[40] = {0}; *ppConn = NULL; - uint32_t sid = htonl(pHeader->destId); + SRpcHeader *pHeader = (SRpcHeader *)data; + + sid = htonl(pHeader->destId); if (pHeader->msgType >= TSDB_MSG_TYPE_MAX || pHeader->msgType <= 0) { tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHeader->msgType); @@ -706,7 +695,7 @@ static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pR pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen); if (dataLen != pHeader->msgLen) { tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid, - taosMsg[pHeader->msgType], dataLen, msgLen); + taosMsg[pHeader->msgType], dataLen, pHeader->msgLen); return TSDB_CODE_INVALID_MSG_LEN; } @@ -724,18 +713,7 @@ static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pR *ppConn = pConn; sid = pConn->sid; - if (pConn->peerIp != ip) { - pConn->peerIp = ip; - char ipstr[20] = {0}; - tinet_ntoa(ipstr, ip); - strcpy(pConn->peerIpstr, ipstr); - } - if (pHeader->uid) pConn->peerUid = pHeader->uid; - if (port) pConn->peerPort = port; - if (pHeader->port) // port maybe changed by the peer - pConn->peerPort = pHeader->port; - if (chandle) pConn->chandle = chandle; if (pHeader->tcp) { tTrace("%s pConn:%p, content will be transfered via TCP", pRpc->label, pConn); @@ -759,7 +737,7 @@ static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pR return code; } -void rpcSendErrorMsgToPeer(char *pMsg, int32 code, uint_32 ip, uint_16 port, void *chandle) { +void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) { SRpcHeader *pRecvHeader, *pReplyHeader; char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(STaosRsp)]; STaosRsp *pRsp; @@ -793,7 +771,7 @@ void rpcSendErrorMsgToPeer(char *pMsg, int32 code, uint_32 ip, uint_16 port, voi } pReplyHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); - (*taosSendData[pRpc->type])(ip, port, pReply, msgLen, chandle); + (*taosSendData[pRpc->type])(ip, port, msg, msgLen, chandle); return; } @@ -815,26 +793,33 @@ void rpcProcessIdleTimer(void *param, void *tmrId) { rpcCloseConn(pConn); } -void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, - void *chandle) { +static void *rpcProcessDataFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) { SRpcHeader *pHeader = (SRpcHeader *)data; - uint8_t code; - SRpcConn *pConn = (SRpcConn *)thandle; SRpcInfo *pRpc = (SRpcInfo *)shandle; - int msgLen; + SRpcConn *pConn = NULL; + uint8_t code = 0; tDump(data, dataLen); - if (ip == 0 && taosCloseConn[pRpc->type] && pConn) { - // it means the connection is broken, it only happens for TCP - tTrace("%s pConn:%p, underlying link is gone%p", pRpc->label, pConn); - pContext->terrno = TSDB_CODE_NETWORK_UNAVAIL; - taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl); - return NULL; + pthread_mutex_lock(&pRpc->mutex); + + code = rpcProcessHeader(pRpc, &pConn, data, dataLen, ip); + + if (pConn) { + // update connection info + pConn->chandle = chandle; + if (pConn->peerIp != ip) { + pConn->peerIp = ip; + char ipstr[20] = {0}; + tinet_ntoa(ipstr, ip); + strcpy(pConn->peerIpstr, ipstr); + } + + if (port) pConn->peerPort = port; + if (pHeader->port) // port maybe changed by the peer + pConn->peerPort = pHeader->port; } - pthread_mutex_lock(&pRpc->mutex); - code = rpcProcessHeader(pHeader, &pConn, pRpc, dataLen, ip, port, chandle); pthread_mutex_unlock(&pRpc->mutex); if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { @@ -850,7 +835,7 @@ void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port if (code != TSDB_CODE_ALREADY_PROCESSED) { if (code != 0) { // parsing error if ( rpcIsReq(pHeader->msgType) ) { - taosSendErrorMsgToPeer(data, code, ip, port, chandle); + rpcSendErrorMsgToPeer(pRpc, data, code, ip, port, chandle); tTrace("%s pConn:%p, %s is sent with error code:%u", pRpc->label, pConn, taosMsg[pHeader->msgType+1], code); } } else { // parsing OK @@ -862,17 +847,17 @@ void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port return pConn; } -void taosProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { SRpcInfo *pRpc = pConn->pRpc; - int msgLen = rpcContLenFromHeader(pHeader->msgLen); + int msgLen = rpcContFromHeader(pHeader->msgLen); pHeader = rpcDecompressRpcMsg(pHeader); - if ( rpcIsReq(msgType) ) { - (*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pConn); + if ( rpcIsReq(pHeader->msgType) ) { + (*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pConn, 0); } else { // it's a response - STaosRsp *pRsp = (STaosRsp *)msg; + STaosRsp *pRsp = (STaosRsp *)pHeader->content; int32_t code = htonl(pRsp->code); SRpcReqContext *pContext = pConn->pContext; @@ -880,12 +865,12 @@ void taosProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { taosAddConnToIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId); - if (code == TSDB_CODE_NOT_MASTER) { - pContext->terrno = code; - taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl); + if (code == TSDB_CODE_NO_MASTER) { + pContext->code = code; + taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); } else { - rpcFreeMsg(rpcGetMsgFromCont(pContext->cont)); // free the request msg - (*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pContext->ahandle); + rpcFreeMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg + (*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pContext->ahandle, pContext->ipSet.index); } } } @@ -893,20 +878,10 @@ void taosProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { SRpcConn *rpcGetConnToServer(void *shandle, SRpcIpSet ipSet) { SRpcInfo *pRpc = (SRpcInfo *)shandle; - SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[index], pRpc->peerPort, pRpc->meterId); + SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ipStr[ipSet.index], ipSet.port, pRpc->meterId); if ( pConn == NULL ) { - SRpcConnInit connInit; - memset(&connInit, 0, sizeof(connInit)); - connInit.sid = 0; - connInit.spi = pRpc->spi; - connInit.encrypt = pRpc->encrypt; - connInit.meterId = pRpc->user; - connInit.peerId = 0; - connInit.shandle = pRpc; - connInit.peerIp = ipstr; - connInit.peerPort = pRpc->peerPort; - pConn = rpcOpenConn(&connInit); + pConn = rpcOpenConn(pRpc, ipSet.ipStr[ipSet.index], ipSet.port); } return pConn; @@ -930,11 +905,14 @@ int taosAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { return msgLen; } -int rpcSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) { +static void rpcSendDataToPeer(SRpcConn *pConn, void *data, int dataLen) { int writtenLen = 0; SRpcInfo *pRpc = pConn->pRpc; SRpcHeader *pHeader = (SRpcHeader *)data; - int code = 0; + + assert(data); + assert(dataLen>0); + assert(pHeader->msgType > 0); dataLen = taosAddAuthPart(pConn, data, dataLen); @@ -955,19 +933,16 @@ int rpcSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) { if (writtenLen != dataLen) { tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, dataLen, writtenLen, strerror(errno)); - code = -1; } tDump(data, dataLen); - - return code; } void rpcSendReqToOneServer(SRpcConn *pConn, SRpcReqContext *pContext) { - - char *pHeader = rpcHeaderFromCont(pContext->pCont); - SRpcHeader *msg = (char *)pHeader; - int msgLen = rpcGetMsgLen(pContext->contLen); + SRpcHeader *pHeader = rpcHeaderFromCont(pContext->pCont); + SRpcInfo *pRpc = pConn->pRpc; + char *msg = (char *)pHeader; + int msgLen = rpcMsgLenFromCont(pContext->contLen); char msgType = pContext->msgType; // set the message header @@ -987,19 +962,16 @@ void rpcSendReqToOneServer(SRpcConn *pConn, SRpcReqContext *pContext) { // set the connection parameters pConn->outType = msgType; pConn->outTranId = pHeader->tranId; - pConn->pMsgNode = pMsgNode; pConn->pReqMsg = msg; pConn->reqMsgLen = msgLen; - pConn->context = pContext; + pConn->pContext = pContext; - if ( rpcSendDataToPeer(pConn, msg, msgLen) < 0 ) { - taosReportError(pConn->pContext, terrno); - } else { - taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); - } + rpcSendDataToPeer(pConn, msg, msgLen); + taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); } -void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, char *pCont, int contLen, void *ahandle) { +void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int contLen, void *ahandle) { + SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcConn *pConn; SRpcReqContext *pContext; @@ -1008,22 +980,23 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, char *pCont, int pContext->ahandle = ahandle; pContext->pRpc = (SRpcInfo *)shandle; pContext->ipSet = ipSet; - pContext->contLen = contLen + pContext->contLen = contLen; pContext->pCont = pCont; - pContext->type = type; + pContext->msgType = type; pConn = rpcGetConnToServer(shandle, ipSet); - pContext->terrno = terrno; - if (pConn == NULL) taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl); + pContext->code = terrno; + if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); rpcSendReqToOneServer(pConn, pContext); return; } -void rpcSendResponse(SRpcConn *pConn, char *pCont, int contLen) { +void rpcSendResponse(void *handle, void *pCont, int contLen) { int msgLen = 0; - SRpcConn *pConn; + SRpcConn *pConn = (SRpcConn *)handle; + SRpcInfo *pRpc = pConn->pRpc; SRpcHeader *pHeader = rpcHeaderFromCont(pCont); char *msg = (char *)pHeader; @@ -1052,38 +1025,23 @@ void rpcSendResponse(SRpcConn *pConn, char *pCont, int contLen) { if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; - pthread_mutex_lock(&pRpc->mutex); + pthread_mutex_unlock(&pRpc->mutex); rpcSendDataToPeer(pConn, msg, msgLen); return; } -static void rpcResendRspToPeer(SRpcConn *pConn) { - - if (pConn->pRspMsg == NULL || pConn->rspMsgLen <= 0 || pConn->rspMsgLen <= sizeof(SRpcHeader)) { - tError("%s pConn:%p, rsp is null", pRpc->label); - return; - } - - SRpcHeader *pHeader = (SRpcHeader *)pConn->pRspMsg; - if (pHeader->msgType <= 0) { - tError("%s pConn:%p, msgType is messed up, rspLen:%d, msgType:%d", pRpc->label, pConn, pHeader->msgLen, pHeader->msgType); - return; - } - - rpcSendDataToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); -} - static void rpcProcessConnError(void *param, void *id) { - SRpcReqContext *pContext = (SRpcContext *)param; + SRpcReqContext *pContext = (SRpcReqContext *)param; + SRpcInfo *pRpc = pContext->pRpc; if ( pContext->numOfRetry >= pContext->ipSet.numOfIps ) { char *rsp = calloc(1, RPC_MSG_OVERHEAD + sizeof(STaosRsp)); if ( rsp ) { - STaosRsp *pRsp = (rsp+sizeof(SRpcHeader)); - pRsp->code = pContext->terrno; - (*(pRpc->fp))(pContext->msgType+1, pRsp, sizeof(STaosRsp), pContext->ahandle); + STaosRsp *pRsp = (STaosRsp *)(rsp+sizeof(SRpcHeader)); + pRsp->code = pContext->code; + (*(pRpc->fp))(pContext->msgType+1, pRsp, sizeof(STaosRsp), pContext->ahandle, 0); } else { tError("%s failed to malloc RSP", pRpc->label); } @@ -1092,9 +1050,9 @@ static void rpcProcessConnError(void *param, void *id) { pContext->ipSet.index++; pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps; - pConn = rpcGetConnToServer(pContext->pRpc, pContext->ipSet); - pContext->terrno = terrno; - if (pConn == NULL) taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl); + SRpcConn *pConn = rpcGetConnToServer(pContext->pRpc, pContext->ipSet); + pContext->code = terrno; + if (pConn == NULL) taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); taosSendReqToOneServer(pConn, pContext); } @@ -1104,22 +1062,13 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; int reportDisc = 0; - if (pConn->signature != param) { - tError("pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param); - return; - } - SRpcInfo *pRpc = pConn->pRpc; - - if (pConn->pTimer != tmrId) { - tTrace("%s pConn:%p, timer:%p already processed%", pRpc->label, pConn); - return; - } + assert(pRpc); pthread_mutex_lock(&pRpc->mutex); if (pConn->outType == 0) { - tTrace("%s pConn:%p, outtype is zero", pRpc->label, pConn); + tTrace("%s pConn:%p, outtype is zero, it is already processed", pRpc->label, pConn); } else { tTrace("%s pConn:%p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); pConn->pTimer = NULL; @@ -1128,21 +1077,19 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { if (pConn->retry < 4) { tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); - if (pConn->pReqMsg && pConn->pReqMsgLen > 0) { - rpcSendDataToPeer(pConn, pReqMsg, pReqMsgLen); - } + rpcSendDataToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); } else { // close the connection tTrace("%s pConn:%p, failed to send msg:%s to %s:%hu", pRpc->label, pConn, - taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn); + taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); reportDisc = 1; } } pthread_mutex_unlock(&pRpc->mutex); - pConn->terrno = TSDB_CODE_NETWORK_UNAVAIL; - if (reportDisc) taosProcessConnError(pConn->pContext, NULL); + pConn->pContext->code = TSDB_CODE_NETWORK_UNAVAIL; + if (reportDisc) rpcProcessConnError(pConn->pContext, NULL); } static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {