From 54cd817f126dad307410c45046dbf70289e7f9fa Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Wed, 19 Feb 2020 23:48:34 +0800 Subject: [PATCH] fix many bugs --- src/rpc/inc/tconncache.h | 8 +- src/rpc/src/tconncache.c | 174 ++++++++--------- src/rpc/src/trpc.c | 392 +++++++++++++++++++------------------- src/rpc/test/rclient.c | 12 +- src/rpc/test/rserver.c | 9 +- src/util/src/tglobalcfg.c | 2 +- 6 files changed, 306 insertions(+), 291 deletions(-) diff --git a/src/rpc/inc/tconncache.h b/src/rpc/inc/tconncache.h index f297dab850..e09d29774e 100644 --- a/src/rpc/inc/tconncache.h +++ b/src/rpc/inc/tconncache.h @@ -20,10 +20,10 @@ extern "C" { #endif -void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer); -void taosCloseConnCache(void *handle); -void taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user); -void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user); +void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer); +void rpcCloseConnCache(void *handle); +void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user); +void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user); #ifdef __cplusplus } diff --git a/src/rpc/src/tconncache.c b/src/rpc/src/tconncache.c index 2d1a61e4d3..1dfdc28a44 100644 --- a/src/rpc/src/tconncache.c +++ b/src/rpc/src/tconncache.c @@ -40,13 +40,13 @@ typedef struct { int * count; int64_t keepTimer; pthread_mutex_t mutex; - void (*cleanFp)(void *); - void *tmrCtrl; - void *pTimer; + void (*cleanFp)(void *); + void *tmrCtrl; + void *pTimer; } SConnCache; -int taosHashConn(void *handle, uint32_t ip, uint16_t port, char *user) { - SConnCache *pObj = (SConnCache *)handle; +int rpcHashConn(void *handle, uint32_t ip, uint16_t port, char *user) { + SConnCache *pCache = (SConnCache *)handle; int hash = 0; // size_t user_len = strlen(user); @@ -58,109 +58,109 @@ int taosHashConn(void *handle, uint32_t ip, uint16_t port, char *user) { user++; } - hash = hash % pObj->maxSessions; + hash = hash % pCache->maxSessions; return hash; } -void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64_t time) { - if (time < pObj->keepTimer + pNode->time) return; +void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) { + if (pNode == NULL || (time < pCache->keepTimer + pNode->time) ) return; SConnHash *pPrev = pNode->prev, *pNext; while (pNode) { - (*pObj->cleanFp)(pNode->data); + (*pCache->cleanFp)(pNode->data); pNext = pNode->next; - pObj->total--; - pObj->count[hash]--; - tscTrace("%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode, - pObj->count[hash]); - taosMemPoolFree(pObj->connHashMemPool, (char *)pNode); + pCache->total--; + pCache->count[hash]--; + tTrace("%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode, + pCache->count[hash]); + taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); pNode = pNext; } if (pPrev) pPrev->next = NULL; else - pObj->connHashList[hash] = NULL; + pCache->connHashList[hash] = NULL; } -void taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) { +void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) { int hash; SConnHash * pNode; - SConnCache *pObj; + SConnCache *pCache; uint64_t time = taosGetTimestampMs(); - pObj = (SConnCache *)handle; - assert(pObj); + pCache = (SConnCache *)handle; + assert(pCache); assert(data); - hash = taosHashConn(pObj, ip, port, user); - pNode = (SConnHash *)taosMemPoolMalloc(pObj->connHashMemPool); + hash = rpcHashConn(pCache, ip, port, user); + pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool); pNode->ip = ip; pNode->port = port; pNode->data = data; pNode->prev = NULL; pNode->time = time; - pthread_mutex_lock(&pObj->mutex); + pthread_mutex_lock(&pCache->mutex); - pNode->next = pObj->connHashList[hash]; - if (pObj->connHashList[hash] != NULL) (pObj->connHashList[hash])->prev = pNode; - pObj->connHashList[hash] = pNode; + pNode->next = pCache->connHashList[hash]; + if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode; + pCache->connHashList[hash] = pNode; - pObj->total++; - pObj->count[hash]++; - taosRemoveExpiredNodes(pObj, pNode->next, hash, time); + pCache->total++; + pCache->count[hash]++; + rpcRemoveExpiredNodes(pCache, pNode->next, hash, time); - pthread_mutex_unlock(&pObj->mutex); + pthread_mutex_unlock(&pCache->mutex); - tscTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pObj->count[hash]); + tTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pCache->count[hash]); return; } -void taosCleanConnCache(void *handle, void *tmrId) { +void rpcCleanConnCache(void *handle, void *tmrId) { int hash; SConnHash * pNode; - SConnCache *pObj; + SConnCache *pCache; - pObj = (SConnCache *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return; - if (pObj->pTimer != tmrId) return; + pCache = (SConnCache *)handle; + if (pCache == NULL || pCache->maxSessions == 0) return; + if (pCache->pTimer != tmrId) return; uint64_t time = taosGetTimestampMs(); - for (hash = 0; hash < pObj->maxSessions; ++hash) { - pthread_mutex_lock(&pObj->mutex); - pNode = pObj->connHashList[hash]; - taosRemoveExpiredNodes(pObj, pNode, hash, time); - pthread_mutex_unlock(&pObj->mutex); + for (hash = 0; hash < pCache->maxSessions; ++hash) { + pthread_mutex_lock(&pCache->mutex); + pNode = pCache->connHashList[hash]; + rpcRemoveExpiredNodes(pCache, pNode, hash, time); + pthread_mutex_unlock(&pCache->mutex); } - // tscTrace("timer, total connections in cache:%d", pObj->total); - taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer); + // tTrace("timer, total connections in cache:%d", pCache->total); + taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer); } -void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) { +void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) { int hash; SConnHash * pNode; - SConnCache *pObj; + SConnCache *pCache; void * pData = NULL; - pObj = (SConnCache *)handle; - assert(pObj); + pCache = (SConnCache *)handle; + assert(pCache); uint64_t time = taosGetTimestampMs(); - hash = taosHashConn(pObj, ip, port, user); - pthread_mutex_lock(&pObj->mutex); + hash = rpcHashConn(pCache, ip, port, user); + pthread_mutex_lock(&pCache->mutex); - pNode = pObj->connHashList[hash]; + pNode = pCache->connHashList[hash]; while (pNode) { - if (time >= pObj->keepTimer + pNode->time) { - taosRemoveExpiredNodes(pObj, pNode, hash, time); + if (time >= pCache->keepTimer + pNode->time) { + rpcRemoveExpiredNodes(pCache, pNode, hash, time); pNode = NULL; break; } @@ -171,12 +171,12 @@ void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) } if (pNode) { - taosRemoveExpiredNodes(pObj, pNode->next, hash, time); + rpcRemoveExpiredNodes(pCache, pNode->next, hash, time); if (pNode->prev) { pNode->prev->next = pNode->next; } else { - pObj->connHashList[hash] = pNode->next; + pCache->connHashList[hash] = pNode->next; } if (pNode->next) { @@ -184,24 +184,24 @@ void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) } pData = pNode->data; - taosMemPoolFree(pObj->connHashMemPool, (char *)pNode); - pObj->total--; - pObj->count[hash]--; + taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); + pCache->total--; + pCache->count[hash]--; } - pthread_mutex_unlock(&pObj->mutex); + pthread_mutex_unlock(&pCache->mutex); if (pData) { - tscTrace("%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pObj->count[hash]); + tTrace("%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pCache->count[hash]); } return pData; } -void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) { +void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) { SConnHash **connHashList; mpool_h connHashMemPool; - SConnCache *pObj; + SConnCache *pCache; connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash)); if (connHashMemPool == 0) return NULL; @@ -212,48 +212,48 @@ void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, return NULL; } - pObj = malloc(sizeof(SConnCache)); - if (pObj == NULL) { + pCache = malloc(sizeof(SConnCache)); + if (pCache == NULL) { taosMemPoolCleanUp(connHashMemPool); free(connHashList); return NULL; } - memset(pObj, 0, sizeof(SConnCache)); + memset(pCache, 0, sizeof(SConnCache)); - pObj->count = calloc(sizeof(int), maxSessions); - pObj->total = 0; - pObj->keepTimer = keepTimer; - pObj->maxSessions = maxSessions; - pObj->connHashMemPool = connHashMemPool; - pObj->connHashList = connHashList; - pObj->cleanFp = cleanFp; - pObj->tmrCtrl = tmrCtrl; - taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer); + pCache->count = calloc(sizeof(int), maxSessions); + pCache->total = 0; + pCache->keepTimer = keepTimer; + pCache->maxSessions = maxSessions; + pCache->connHashMemPool = connHashMemPool; + pCache->connHashList = connHashList; + pCache->cleanFp = cleanFp; + pCache->tmrCtrl = tmrCtrl; + taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer); - pthread_mutex_init(&pObj->mutex, NULL); + pthread_mutex_init(&pCache->mutex, NULL); - return pObj; + return pCache; } -void taosCloseConnCache(void *handle) { - SConnCache *pObj; +void rpcCloseConnCache(void *handle) { + SConnCache *pCache; - pObj = (SConnCache *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return; + pCache = (SConnCache *)handle; + if (pCache == NULL || pCache->maxSessions == 0) return; - pthread_mutex_lock(&pObj->mutex); + pthread_mutex_lock(&pCache->mutex); - taosTmrStopA(&(pObj->pTimer)); + taosTmrStopA(&(pCache->pTimer)); - if (pObj->connHashMemPool) taosMemPoolCleanUp(pObj->connHashMemPool); + if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool); - tfree(pObj->connHashList); - tfree(pObj->count) + tfree(pCache->connHashList); + tfree(pCache->count) - pthread_mutex_unlock(&pObj->mutex); + pthread_mutex_unlock(&pCache->mutex); - pthread_mutex_destroy(&pObj->mutex); + pthread_mutex_destroy(&pCache->mutex); - memset(pObj, 0, sizeof(SConnCache)); - free(pObj); + memset(pCache, 0, sizeof(SConnCache)); + free(pCache); } diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index 0333f2b3bf..bd72ddd3cc 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -32,11 +32,11 @@ #include "trpc.h" #include "taoserror.h" -#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest)) -#define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader))) -#define rpcContFromHeader(msg) (msg + sizeof(SRpcHeader)) -#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHeader)) -#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHeader)) +#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) +#define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead))) +#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) +#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) +#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) #define rpcIsReq(type) (type & 1U) typedef struct { @@ -48,11 +48,11 @@ typedef struct { int connType; char label[12]; - char *meterId; // meter ID + char meterId[TSDB_UNI_LEN]; // meter ID char spi; // security parameter index char encrypt; // encrypt algorithm - char *secret; // key for authentication - char *ckey; // ciphering key + uint8_t secret[TSDB_KEY_LEN]; // secret for the link + uint8_t ckey[TSDB_KEY_LEN]; // ciphering key void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); @@ -78,7 +78,7 @@ typedef struct { int16_t numOfTry; // number of try for different servers int8_t oldIndex; // server IP index passed by app int8_t redirect; // flag to indicate redirect - char msg[0]; // RpcHeader starts from here + char msg[0]; // RpcHead starts from here } SRpcReqContext; typedef struct _RpcConn { @@ -114,6 +114,8 @@ typedef struct _RpcConn { SRpcReqContext *pContext; // request context } SRpcConn; +#pragma pack(push, 1) + typedef struct { char version:4; // RPC version char comp:4; // compression algorithm, 0:no compression 1:lz4 @@ -131,7 +133,7 @@ typedef struct { int32_t msgLen; // message length including the header iteslf int32_t code; uint8_t content[0]; // message body starts from here -} SRpcHeader; +} SRpcHead; typedef struct { int32_t reserved; @@ -143,6 +145,8 @@ typedef struct { uint8_t auth[TSDB_AUTH_LEN]; } SRpcDigest; +#pragma pack(pop) + int tsRpcProgressTime = 10; // milliseocnds // not configurable @@ -197,7 +201,7 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uin static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle); -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader); +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); @@ -205,7 +209,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId); static void rpcFreeOutMsg(void *msg); static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); -static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader); +static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead); static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen); @@ -218,7 +222,7 @@ void *rpcOpen(SRpcInit *pInit) { pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) return NULL; - strcpy(pRpc->label, pInit->label); + if(pInit->label) strcpy(pRpc->label, pInit->label); pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->numOfThreads = pInit->numOfThreads; @@ -226,14 +230,14 @@ void *rpcOpen(SRpcInit *pInit) { pRpc->numOfThreads = TSDB_MAX_RPC_THREADS; } - strcpy(pRpc->localIp, pInit->localIp); + if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp); pRpc->localPort = pInit->localPort; pRpc->afp = pInit->afp; pRpc->sessions = pInit->sessions; - strcpy(pRpc->meterId, pInit->meterId); + if (pInit->meterId) strcpy(pRpc->meterId, pInit->meterId); + if (pInit->secret) strcpy(pRpc->secret, pInit->secret); + if (pInit->ckey) strcpy(pRpc->ckey, pInit->ckey); pRpc->spi = pInit->spi; - strcpy(pRpc->secret, pInit->secret); - strcpy(pRpc->ckey, pInit->ckey); pRpc->ufp = pInit->ufp; pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; @@ -275,7 +279,7 @@ void *rpcOpen(SRpcInit *pInit) { return NULL; } - pRpc->pCache = taosOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000); + pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000); if ( pRpc->pCache == NULL ) { tError("%s failed to init connection cache", pRpc->label); rpcClose(pRpc); @@ -303,7 +307,7 @@ void rpcClose(void *param) { taosCleanUpStrHash(pRpc->hash); taosTmrCleanUp(pRpc->tmrCtrl); taosIdPoolCleanUp(pRpc->idPool); - taosCloseConnCache(pRpc->pCache); + rpcCloseConnCache(pRpc->pCache); tfree(pRpc->connList); pthread_mutex_destroy(&pRpc->mutex); @@ -320,11 +324,11 @@ void *rpcMallocCont(int size) { return NULL; } - return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHeader); + return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHead); } void rpcFreeCont(void *cont) { - char *msg = ((char *)cont) - sizeof(SRpcHeader); + char *msg = ((char *)cont) - sizeof(SRpcHead); free(msg); } @@ -333,7 +337,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int SRpcReqContext *pContext; contLen = rpcCompressRpcMsg(pCont, contLen); - pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHeader)-sizeof(SRpcReqContext)); + pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext->ahandle = ahandle; pContext->pRpc = (SRpcInfo *)shandle; pContext->ipSet = ipSet; @@ -348,11 +352,11 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int } void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { - int msgLen = 0; - SRpcConn *pConn = (SRpcConn *)handle; - SRpcInfo *pRpc = pConn->pRpc; - SRpcHeader *pHeader = rpcHeaderFromCont(pCont); - char *msg = (char *)pHeader; + int msgLen = 0; + SRpcConn *pConn = (SRpcConn *)handle; + SRpcInfo *pRpc = pConn->pRpc; + SRpcHead *pHead = rpcHeadFromCont(pCont); + char *msg = (char *)pHead; if ( pCont == NULL ) { pCont = rpcMallocCont(0); @@ -371,17 +375,17 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { } // set msg header - pHeader->version = 1; - pHeader->msgType = pConn->inType+1; - pHeader->spi = 0; - pHeader->tcp = 0; - pHeader->encrypt = 0; - pHeader->tranId = pConn->inTranId; - pHeader->sourceId = pConn->ownId; - pHeader->destId = pConn->peerId; - pHeader->uid = 0; - pHeader->code = htonl(code); - memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); + pHead->version = 1; + pHead->msgType = pConn->inType+1; + pHead->spi = 0; + pHead->tcp = 0; + pHead->encrypt = 0; + pHead->tranId = pConn->inTranId; + pHead->sourceId = pConn->ownId; + pHead->destId = pConn->peerId; + pHead->uid = 0; + pHead->code = htonl(code); + memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId)); // set pConn parameters pConn->inType = 0; @@ -390,7 +394,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { rpcFreeOutMsg(pConn->pRspMsg); pConn->pRspMsg = msg; pConn->rspMsgLen = msgLen; - if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; + if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; pthread_mutex_unlock(&pRpc->mutex); @@ -487,7 +491,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions); terrno = TSDB_CODE_MAX_SESSIONS; } else { - tTrace("%s sid:%d, ID allocated, used:%d, old id:%d", pRpc->label, sid, taosIdPoolNumOfUsed(pRpc->idPool)); + tTrace("%s sid:%d, ID allocated, used:%d", pRpc->label, sid, taosIdPoolNumOfUsed(pRpc->idPool)); pConn = pRpc->connList + sid; memset(pConn, 0, sizeof(SRpcConn)); @@ -562,7 +566,7 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *has SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) { SRpcConn *pConn; - pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId); + pConn = rpcGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId); if ( pConn == NULL ) { char ipstr[20] = {0}; tinet_ntoa(ipstr, ipSet.ip[ipSet.index]); @@ -572,29 +576,29 @@ SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) { return pConn; } -static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { +static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { SRpcInfo *pRpc= pConn->pRpc; if (pConn->peerId == 0) { - pConn->peerId = pHeader->sourceId; + pConn->peerId = pHead->sourceId; } else { - if (pConn->peerId != pHeader->sourceId) { + if (pConn->peerId != pHead->sourceId) { tTrace("%s pConn:%p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn, - pConn->peerId, pHeader->sourceId); + pConn->peerId, pHead->sourceId); return TSDB_CODE_INVALID_VALUE; } } - if (pConn->inTranId == pHeader->tranId) { - if (pConn->inType == pHeader->msgType) { - tTrace("%s pConn:%p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHeader->msgType]); + if (pConn->inTranId == pHead->tranId) { + if (pConn->inType == pHead->msgType) { + tTrace("%s pConn:%p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); } else if (pConn->inType == 0) { tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn, - taosMsg[pHeader->msgType], pConn->inTranId); + taosMsg[pHead->msgType], pConn->inTranId); rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response } else { - tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHeader->msgType]); + tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHead->msgType]); } // do not reply any message @@ -603,40 +607,40 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { if (pConn->inType != 0) { tTrace("%s pConn:%p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn, - pConn->inTranId, pHeader->tranId); + pConn->inTranId, pHead->tranId); return TSDB_CODE_LAST_SESSION_NOT_FINISHED; } - pConn->inTranId = pHeader->tranId; - pConn->inType = pHeader->msgType; + pConn->inTranId = pHead->tranId; + pConn->inType = pHead->msgType; return 0; } -static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) { +static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { SRpcInfo *pRpc = pConn->pRpc; - pConn->peerId = pHeader->sourceId; + pConn->peerId = pHead->sourceId; if (pConn->outType == 0 || pConn->pContext == NULL) { return TSDB_CODE_UNEXPECTED_RESPONSE; } - if (pHeader->tranId != pConn->outTranId) { + if (pHead->tranId != pConn->outTranId) { return TSDB_CODE_INVALID_TRAN_ID; } - if (pHeader->msgType != pConn->outType + 1) { + if (pHead->msgType != pConn->outType + 1) { return TSDB_CODE_INVALID_RESPONSE_TYPE; } - if (*pHeader->content == TSDB_CODE_NOT_READY) { + if (*pHead->content == TSDB_CODE_NOT_READY) { return TSDB_CODE_ALREADY_PROCESSED; } taosTmrStopA(&pConn->pTimer); pConn->retry = 0; - if (*pHeader->content == TSDB_CODE_ACTION_IN_PROGRESS || pHeader->tcp) { + if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS || pHead->tcp) { if (pConn->tretry <= tsRpcMaxRetry) { pConn->tretry++; tTrace("%s pConn:%p, peer is still processing the transaction", pRpc->label, pConn); @@ -644,7 +648,7 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) { return TSDB_CODE_ALREADY_PROCESSED; } else { // peer still in processing, give up - *pHeader->content = TSDB_CODE_TOO_SLOW; + *pHead->content = TSDB_CODE_TOO_SLOW; } } @@ -656,77 +660,77 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) { return TSDB_CODE_SUCCESS; } -static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) { +static int rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) { int32_t sid, code = 0; SRpcConn * pConn = NULL; char hashstr[40] = {0}; *ppConn = NULL; - SRpcHeader *pHeader = (SRpcHeader *)data; + SRpcHead *pHead = (SRpcHead *)data; - sid = htonl(pHeader->destId); - pHeader->code = htonl(pHeader->code); - pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen); + sid = htonl(pHead->destId); + pHead->code = htonl(pHead->code); + pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); - if (pHeader->msgType >= TSDB_MSG_TYPE_MAX || pHeader->msgType <= 0) { - tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHeader->msgType); + if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { + tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); return TSDB_CODE_INVALID_MSG_TYPE; } - if (dataLen != pHeader->msgLen) { + if (dataLen != pHead->msgLen) { tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid, - taosMsg[pHeader->msgType], dataLen, pHeader->msgLen); + taosMsg[pHead->msgType], dataLen, pHead->msgLen); return TSDB_CODE_INVALID_MSG_LEN; } if (sid < 0 || sid >= pRpc->sessions) { tTrace("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, - pRpc->sessions, taosMsg[pHeader->msgType]); + pRpc->sessions, taosMsg[pHead->msgType]); return TSDB_CODE_INVALID_SESSION_ID; } - if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHeader->uid, pHeader->sourceId); - pConn = rpcGetConnObj(pRpc, sid, pHeader->meterId, hashstr); + if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHead->uid, pHead->sourceId); + pConn = rpcGetConnObj(pRpc, sid, pHead->meterId, hashstr); if (pConn == NULL ) return terrno; *ppConn = pConn; sid = pConn->sid; - if (pHeader->uid) pConn->peerUid = pHeader->uid; + if (pHead->uid) pConn->peerUid = pHead->uid; - if (pHeader->tcp) { + if (pHead->tcp) { tTrace("%s pConn:%p, content will be transfered via TCP", pRpc->label, pConn); if (pConn->outType) taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); return TSDB_CODE_ALREADY_PROCESSED; } - code = rpcCheckAuthentication(pConn, (char *)pHeader, dataLen); + code = rpcCheckAuthentication(pConn, (char *)pHead, dataLen); if ( code != 0 ) return code; - if (pHeader->msgType != TSDB_MSG_TYPE_REG && pHeader->encrypt) { + if (pHead->msgType != TSDB_MSG_TYPE_REG && pHead->encrypt) { // decrypt here } - if ( rpcIsReq(pHeader->msgType) ) { - code = rpcProcessReqHeader(pConn, pHeader); + if ( rpcIsReq(pHead->msgType) ) { + code = rpcProcessReqHead(pConn, pHead); } else { - code = rpcProcessRspHeader(pConn, pHeader); + code = rpcProcessRspHead(pConn, pHead); } return code; } static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) { - SRpcHeader *pHeader = (SRpcHeader *)data; - SRpcInfo *pRpc = (SRpcInfo *)shandle; - SRpcConn *pConn = NULL; - uint8_t code = 0; + SRpcHead *pHead = (SRpcHead *)data; + SRpcInfo *pRpc = (SRpcInfo *)shandle; + SRpcConn *pConn = NULL; + uint8_t code = 0; tDump(data, dataLen); pthread_mutex_lock(&pRpc->mutex); - code = rpcProcessHeader(pRpc, &pConn, data, dataLen, ip); + code = rpcProcessHead(pRpc, &pConn, data, dataLen, ip); if (pConn) { // update connection info @@ -739,16 +743,16 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ } if (port) pConn->peerPort = port; - if (pHeader->port) // port maybe changed by the peer - pConn->peerPort = pHeader->port; + if (pHead->port) // port maybe changed by the peer + pConn->peerPort = pHead->port; } pthread_mutex_unlock(&pRpc->mutex); - if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { + if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", - pRpc->label, pConn, taosMsg[pHeader->msgType], ip, port, code, - dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); + pRpc->label, pConn, taosMsg[pHead->msgType], ip, port, code, + dataLen, pHead->sourceId, pHead->destId, pHead->tranId); } if (pConn && pRpc->idleTime) { @@ -757,12 +761,12 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ if (code != TSDB_CODE_ALREADY_PROCESSED) { if (code != 0) { // parsing error - if ( rpcIsReq(pHeader->msgType) ) { + if ( rpcIsReq(pHead->msgType) ) { rpcSendErrorMsgToPeer(pRpc, data, code, ip, port, chandle); - tTrace("%s pConn:%p, %s is sent with error code:%u", pRpc->label, pConn, taosMsg[pHeader->msgType+1], code); + tTrace("%s pConn:%p, %s is sent with error code:%u", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); } } else { // parsing OK - rpcProcessIncomingMsg(pConn, pHeader); + rpcProcessIncomingMsg(pConn, pHead); } } @@ -770,100 +774,100 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ return pConn; } -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { SRpcInfo *pRpc = pConn->pRpc; - pHeader = rpcDecompressRpcMsg(pHeader); - int contLen = rpcContLenFromMsg(pHeader->msgLen); - uint8_t *pCont = pHeader->content; + pHead = rpcDecompressRpcMsg(pHead); + int contLen = rpcContLenFromMsg(pHead->msgLen); + uint8_t *pCont = pHead->content; - if ( rpcIsReq(pHeader->msgType) ) { + if ( rpcIsReq(pHead->msgType) ) { taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); - (*(pRpc->cfp))(pHeader->msgType, pCont, contLen, pConn, 0); + (*(pRpc->cfp))(pHead->msgType, pCont, contLen, pConn, 0); } else { // it's a response - int32_t code = pHeader->code; + int32_t code = pHead->code; SRpcReqContext *pContext = pConn->pContext; pConn->pContext = NULL; - taosAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId); + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId); if (code == TSDB_CODE_REDIRECT) { pContext->redirect = 1; pContext->numOfTry = 0; - memcpy(&pContext->ipSet, pHeader->content, sizeof(pContext->ipSet)); + memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); rpcSendReqToServer(pRpc, pContext); } else { - rpcFreeOutMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg + rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg if ( pContext->ipSet.index != pContext->oldIndex || pContext->redirect ) (*pRpc->ufp)(pContext->ahandle, pContext->ipSet); - (*pRpc->cfp)(pHeader->msgType, pCont, contLen, pContext->ahandle, pContext->ipSet.index); + (*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, pContext->ipSet.index); } } } static void rpcSendQuickRsp(SRpcConn *pConn, char code) { - char msg[RPC_MSG_OVERHEAD]; - SRpcHeader *pHeader; + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pHead; // set msg header - memset(msg, 0, sizeof(SRpcHeader)); - pHeader = (SRpcHeader *)msg; - pHeader->version = 1; - pHeader->msgType = pConn->inType+1; - pHeader->spi = 0; - pHeader->tcp = 0; - pHeader->encrypt = 0; - pHeader->tranId = pConn->inTranId; - pHeader->sourceId = pConn->ownId; - pHeader->destId = pConn->peerId; - pHeader->uid = 0; - memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); - pHeader->code = htonl(code); + memset(msg, 0, sizeof(SRpcHead)); + pHead = (SRpcHead *)msg; + pHead->version = 1; + pHead->msgType = pConn->inType+1; + pHead->spi = 0; + pHead->tcp = 0; + pHead->encrypt = 0; + pHead->tranId = pConn->inTranId; + pHead->sourceId = pConn->ownId; + pHead->destId = pConn->peerId; + pHead->uid = 0; + memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId)); + pHead->code = htonl(code); rpcSendMsgToPeer(pConn, msg, 0); } static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) { - SRpcHeader *pRecvHeader, *pReplyHeader; - char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(uint32_t) ]; + SRpcHead *pRecvHead, *pReplyHead; + char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) ]; uint32_t timeStamp; int msgLen; - pRecvHeader = (SRpcHeader *)pMsg; - pReplyHeader = (SRpcHeader *)msg; + pRecvHead = (SRpcHead *)pMsg; + pReplyHead = (SRpcHead *)msg; - memset(msg, 0, sizeof(SRpcHeader)); - pReplyHeader->version = pRecvHeader->version; - pReplyHeader->msgType = (char)(pRecvHeader->msgType + 1); - pReplyHeader->tcp = 0; - pReplyHeader->spi = 0; - pReplyHeader->encrypt = 0; - pReplyHeader->tranId = pRecvHeader->tranId; - pReplyHeader->sourceId = 0; - pReplyHeader->destId = pRecvHeader->sourceId; - memcpy(pReplyHeader->meterId, pRecvHeader->meterId, tListLen(pReplyHeader->meterId)); + memset(msg, 0, sizeof(SRpcHead)); + pReplyHead->version = pRecvHead->version; + pReplyHead->msgType = (char)(pRecvHead->msgType + 1); + pReplyHead->tcp = 0; + pReplyHead->spi = 0; + pReplyHead->encrypt = 0; + pReplyHead->tranId = pRecvHead->tranId; + pReplyHead->sourceId = 0; + pReplyHead->destId = pRecvHead->sourceId; + memcpy(pReplyHead->meterId, pRecvHead->meterId, tListLen(pReplyHead->meterId)); - pReplyHeader->code = htonl(code); - msgLen = sizeof(SRpcHeader); + pReplyHead->code = htonl(code); + msgLen = sizeof(SRpcHead); if (code == TSDB_CODE_INVALID_TIME_STAMP) { // include a time stamp if client's time is not synchronized well - uint8_t *pContent = pReplyHeader->content; + uint8_t *pContent = pReplyHead->content; timeStamp = taosGetTimestampSec(); memcpy(pContent, &timeStamp, sizeof(timeStamp)); msgLen += sizeof(timeStamp); } - pReplyHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); + pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen); (*taosSendData[pRpc->connType])(ip, port, msg, msgLen, chandle); return; } static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { - SRpcHeader *pHeader = rpcHeaderFromCont(pContext->pCont); - char *msg = (char *)pHeader; - int msgLen = rpcMsgLenFromCont(pContext->contLen); - char msgType = pContext->msgType; + SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); + char *msg = (char *)pHead; + int msgLen = rpcMsgLenFromCont(pContext->contLen); + char msgType = pContext->msgType; pContext->numOfTry++; SRpcConn *pConn = rpcSetConnToServer(pRpc, pContext->ipSet); @@ -876,22 +880,22 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pthread_mutex_lock(&pRpc->mutex); // set the message header - pHeader->version = 1; - pHeader->msgType = msgType; - pHeader->tcp = 0; - pHeader->encrypt = 0; + pHead->version = 1; + pHead->msgType = msgType; + pHead->tcp = 0; + pHead->encrypt = 0; pConn->tranId++; if ( pConn->tranId == 0 ) pConn->tranId++; - pHeader->tranId = pConn->tranId; - pHeader->sourceId = pConn->ownId; - pHeader->destId = pConn->peerId; - pHeader->port = 0; - pHeader->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid()); - memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); + pHead->tranId = pConn->tranId; + pHead->sourceId = pConn->ownId; + pHead->destId = pConn->peerId; + pHead->port = 0; + pHead->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid()); + memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId)); // set the connection parameters pConn->outType = msgType; - pConn->outTranId = pHeader->tranId; + pConn->outTranId = pHead->tranId; pConn->pReqMsg = msg; pConn->reqMsgLen = msgLen; pConn->pContext = pContext; @@ -903,25 +907,25 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { } static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { - int writtenLen = 0; - SRpcInfo *pRpc = pConn->pRpc; - SRpcHeader *pHeader = (SRpcHeader *)msg; + int writtenLen = 0; + SRpcInfo *pRpc = pConn->pRpc; + SRpcHead *pHead = (SRpcHead *)msg; msgLen = rpcAddAuthPart(pConn, msg, msgLen); - if ( rpcIsReq(pHeader->msgType)) { - if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) + if ( rpcIsReq(pHead->msgType)) { + if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d", - pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, - pConn->peerPort, msgLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); + pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, + pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { - if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) + if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", - pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pConn->peerPort, - (uint8_t)pHeader->content[0], msgLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); + pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, + (uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } - writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHeader, msgLen, pConn->chandle); + writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHead, msgLen, pConn->chandle); if (writtenLen != msgLen) { tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, @@ -936,7 +940,7 @@ static void rpcProcessConnError(void *param, void *id) { SRpcInfo *pRpc = pContext->pRpc; if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) { - rpcFreeOutMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg + rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg (*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code); } else { // move to next IP @@ -959,7 +963,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { pConn->retry++; if (pConn->retry < 4) { - tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label, + tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label, pConn, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer<retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); @@ -1018,9 +1022,9 @@ static void rpcFreeOutMsg(void *msg) { } static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { - SRpcHeader *pHeader = rpcHeaderFromCont(pCont); - int32_t finalLen = 0; - int overhead = sizeof(SRpcComp); + SRpcHead *pHead = rpcHeadFromCont(pCont); + int32_t finalLen = 0; + int overhead = sizeof(SRpcComp); if (!NEEDTO_COMPRESSS_MSG(contLen)) { return contLen; @@ -1044,7 +1048,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { pComp->contLen = htonl(contLen); memcpy(pCont + overhead, buf, compLen); - pHeader->comp = 1; + pHead->comp = 1; tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); finalLen = compLen + overhead; @@ -1056,13 +1060,13 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { return finalLen; } -static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) { +static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { int overhead = sizeof(SRpcComp); - SRpcHeader *pNewHeader = NULL; - uint8_t *pCont = pHeader->content; - SRpcComp *pComp = (SRpcComp *)pHeader->content; + SRpcHead *pNewHead = NULL; + uint8_t *pCont = pHead->content; + SRpcComp *pComp = (SRpcComp *)pHead->content; - if (pHeader->comp) { + if (pHead->comp) { // decompress the content assert(pComp->reserved == 0); int contLen = htonl(pComp->contLen); @@ -1071,21 +1075,21 @@ static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) { char *buf = rpcMallocCont(contLen); if (buf) { - pNewHeader = rpcHeaderFromCont(buf); - int compLen = rpcContLenFromMsg(pHeader->msgLen) - overhead; + pNewHead = rpcHeadFromCont(buf); + int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; int32_t originalLen = LZ4_decompress_safe((const char*)(pCont + overhead), buf, compLen, contLen); assert(originalLen == contLen); - memcpy(pNewHeader, pHeader, sizeof(SRpcHeader)); - pNewHeader->msgLen = rpcMsgLenFromCont(originalLen); - free(pHeader); // free the compressed message buffer - pHeader = pNewHeader; + memcpy(pNewHead, pHead, sizeof(SRpcHead)); + pNewHead->msgLen = rpcMsgLenFromCont(originalLen); + free(pHead); // free the compressed message buffer + pHead = pNewHead; } else { tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno)); } } - return pHeader; + return pHead; } static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) { @@ -1103,7 +1107,7 @@ static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t return ret; } -static int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) { +static int rpcBuildAuthHead(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) { MD5_CTX context; MD5Init(&context); @@ -1118,33 +1122,33 @@ static int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t } static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { - SRpcHeader *pHeader = (SRpcHeader *)msg; + SRpcHead *pHead = (SRpcHead *)msg; if (pConn->spi) { // add auth part - pHeader->spi = pConn->spi; + pHead->spi = pConn->spi; SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen); pDigest->timeStamp = htonl(taosGetTimestampSec()); msgLen += sizeof(SRpcDigest); - pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); - rpcBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + rpcBuildAuthHead((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); } else { - pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); } return msgLen; } static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { - SRpcHeader *pHeader = (SRpcHeader *)msg; - SRpcInfo *pRpc = pConn->pRpc; - int code = 0; + SRpcHead *pHead = (SRpcHead *)msg; + SRpcInfo *pRpc = pConn->pRpc; + int code = 0; if (pConn->spi == 0 ) return 0; - if (pHeader->spi == pConn->spi) { + if (pHead->spi == pConn->spi) { // authentication - SRpcDigest *pDigest = (SRpcDigest *)((char *)pHeader + msgLen - sizeof(SRpcDigest)); + SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest)); int32_t delta; delta = (int32_t)htonl(pDigest->timeStamp); @@ -1154,16 +1158,16 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { delta, htonl(pDigest->timeStamp)); code = TSDB_CODE_INVALID_TIME_STAMP; } else { - if (rpcAuthenticateMsg((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { + if (rpcAuthenticateMsg((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn); code = TSDB_CODE_AUTH_FAILURE; } else { - pHeader->msgLen -= sizeof(SRpcDigest); + pHead->msgLen -= sizeof(SRpcDigest); } } } else { // if it is request or response with code 0, msg shall be discarded - if (rpcIsReq(pHeader->msgType) || (pHeader->content[0] == 0)) { + if (rpcIsReq(pHead->msgType) || (pHead->content[0] == 0)) { tTrace("%s pConn:%p, auth spi not matched, msg discarded", pRpc->label, pConn); code = TSDB_CODE_AUTH_FAILURE; } diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index a139a591c0..cce5d0e345 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -24,18 +24,23 @@ void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code } int32_t main(int32_t argc, char *argv[]) { + + taosInitLog("client.log", 100000, 10); dPrint("unit test for rpc module"); SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = "0.0.0.0"; - rpcInit.localPort = 7000; - rpcInit.label = "unittest"; + rpcInit.localPort = 0; + rpcInit.label = "APP"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processMsg; + rpcInit.cfp = processMsg; rpcInit.sessions = 1000; rpcInit.connType = TAOS_CONN_UDPC; rpcInit.idleTime = 2000; + rpcInit.meterId = "jefftao"; + rpcInit.secret = "password"; + rpcInit.ckey = "key"; void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { @@ -46,6 +51,7 @@ int32_t main(int32_t argc, char *argv[]) { SRpcIpSet ipSet; ipSet.numOfIps = 2; ipSet.index = 0; + ipSet.port = 7000; ipSet.ip[0] = inet_addr("127.0.0.1"); ipSet.ip[1] = inet_addr("192.168.0.1"); diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 1fc7b6e336..85994a04d0 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -27,18 +27,23 @@ void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code } int32_t main(int32_t argc, char *argv[]) { + taosInitLog("server.log", 100000, 10); + dPrint("unit test for rpc module"); SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = "0.0.0.0"; rpcInit.localPort = 7000; - rpcInit.label = "unittest"; + rpcInit.label = "APP"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processMsg; + rpcInit.cfp = processMsg; rpcInit.sessions = 1000; rpcInit.connType = TAOS_CONN_UDPS; rpcInit.idleTime = 2000; + rpcInit.meterId = "jefftao"; + rpcInit.secret = "password"; + rpcInit.ckey = "key"; void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index c07268aa1e..04978d537d 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -209,7 +209,7 @@ char tsLocale[TSDB_LOCALE_LEN] = {0}; char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string int tsNumOfLogLines = 10000000; -uint32_t rpcDebugFlag = 131; +uint32_t rpcDebugFlag = 135; uint32_t ddebugFlag = 131; uint32_t mdebugFlag = 135; uint32_t sdbDebugFlag = 135; -- GitLab