From f74ad86c5474df9a79b1f9123ff17420bc3249e9 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 22 Feb 2020 19:52:38 +0800 Subject: [PATCH] support multiple UDP threads change the mutex in connection cache to home made lock --- src/inc/trpc.h | 17 +-- src/rpc/inc/rpcCache.h | 4 +- src/rpc/src/rpcCache.c | 234 +++++++++++++++++++++++------------------ src/rpc/src/rpcMain.c | 101 ++++++++++-------- src/rpc/test/rclient.c | 6 +- src/rpc/test/rserver.c | 5 +- 6 files changed, 204 insertions(+), 163 deletions(-) diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 121bb18382..710e9bc5e6 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -29,8 +29,8 @@ extern "C" { extern int tsRpcHeadSize; typedef struct { - int16_t index; - int16_t numOfIps; + int8_t inUse; + int8_t numOfIps; uint16_t port; uint32_t ip[TSDB_MAX_MPEERS]; } SRpcIpSet; @@ -43,13 +43,13 @@ typedef struct { } SRpcConnInfo; typedef struct { - char *localIp; // local IP used + char *localIp; // local IP used uint16_t localPort; // local port - char *label; // for debug purpose - int numOfThreads; // number of threads to handle connections - int sessions; // number of sessions allowed - int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS - int idleTime; // milliseconds, 0 means idle timer is disabled + char *label; // for debug purpose + int numOfThreads; // number of threads to handle connections + int sessions; // number of sessions allowed + int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS + int idleTime; // milliseconds, 0 means idle timer is disabled // the following is for client app ecurity only char *user; // user name @@ -72,6 +72,7 @@ void *rpcOpen(SRpcInit *pRpc); void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); +void *rpcReallocCont(void *ptr, int contLen); void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle); void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen); void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); diff --git a/src/rpc/inc/rpcCache.h b/src/rpc/inc/rpcCache.h index 5fc7992e43..2a386c066e 100644 --- a/src/rpc/inc/rpcCache.h +++ b/src/rpc/inc/rpcCache.h @@ -22,8 +22,8 @@ extern "C" { void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer); void rpcCloseConnCache(void *handle); -void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user); -void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user); +void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, int8_t connType); +void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connType); #ifdef __cplusplus } diff --git a/src/rpc/src/rpcCache.c b/src/rpc/src/rpcCache.c index 6f5a8e9d53..a397f6f845 100644 --- a/src/rpc/src/rpcCache.c +++ b/src/rpc/src/rpcCache.c @@ -14,7 +14,6 @@ */ #include "os.h" - #include "tglobalcfg.h" #include "tlog.h" #include "tmempool.h" @@ -26,6 +25,7 @@ typedef struct _c_hash_t { uint32_t ip; uint16_t port; + char connType; struct _c_hash_t *prev; struct _c_hash_t *next; void * data; @@ -43,49 +43,77 @@ typedef struct { void (*cleanFp)(void *); void *tmrCtrl; void *pTimer; + int64_t *lockedBy; } SConnCache; -int rpcHashConn(void *handle, uint32_t ip, uint16_t port, char *user) { - SConnCache *pCache = (SConnCache *)handle; - int hash = 0; - // size_t user_len = strlen(user); +static int rpcHashConn(void *handle, uint32_t ip, uint16_t port, int8_t connType); +static void rpcLockCache(int64_t *lockedBy); +static void rpcUnlockCache(int64_t *lockedBy); +static void rpcCleanConnCache(void *handle, void *tmrId); +static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time); - hash = ip >> 16; - hash += (unsigned short)(ip & 0xFFFF); - hash += port; - while (*user != '\0') { - hash += *user; - user++; +void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) { + SConnHash **connHashList; + mpool_h connHashMemPool; + SConnCache *pCache; + + connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash)); + if (connHashMemPool == 0) return NULL; + + connHashList = calloc(sizeof(SConnHash *), maxSessions); + if (connHashList == 0) { + taosMemPoolCleanUp(connHashMemPool); + return NULL; } - hash = hash % pCache->maxSessions; + pCache = malloc(sizeof(SConnCache)); + if (pCache == NULL) { + taosMemPoolCleanUp(connHashMemPool); + free(connHashList); + return NULL; + } + memset(pCache, 0, sizeof(SConnCache)); - return hash; + pCache->count = calloc(sizeof(int), maxSessions); + pCache->total = 0; + pCache->keepTimer = keepTimer; + pCache->maxSessions = maxSessions; + pCache->connHashMemPool = connHashMemPool; + pCache->connHashList = connHashList; + pCache->cleanFp = cleanFp; + pCache->tmrCtrl = tmrCtrl; + pCache->lockedBy = calloc(sizeof(int64_t), maxSessions); + taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer); + + pthread_mutex_init(&pCache->mutex, NULL); + + return pCache; } -void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) { - if (pNode == NULL || (time < pCache->keepTimer + pNode->time) ) return; +void rpcCloseConnCache(void *handle) { + SConnCache *pCache; - SConnHash *pPrev = pNode->prev, *pNext; + pCache = (SConnCache *)handle; + if (pCache == NULL || pCache->maxSessions == 0) return; - while (pNode) { - (*pCache->cleanFp)(pNode->data); - pNext = pNode->next; - pCache->total--; - pCache->count[hash]--; - tTrace("%p ip:0x%x:%hu:%d:%p removed from cache, connections:%d", pNode->data, pNode->ip, pNode->port, hash, pNode, - pCache->count[hash]); - taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); - pNode = pNext; - } + pthread_mutex_lock(&pCache->mutex); - if (pPrev) - pPrev->next = NULL; - else - pCache->connHashList[hash] = NULL; + taosTmrStopA(&(pCache->pTimer)); + + if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool); + + tfree(pCache->connHashList); + tfree(pCache->count) + + pthread_mutex_unlock(&pCache->mutex); + + pthread_mutex_destroy(&pCache->mutex); + + memset(pCache, 0, sizeof(SConnCache)); + free(pCache); } -void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) { +void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, int8_t connType) { int hash; SConnHash * pNode; SConnCache *pCache; @@ -96,54 +124,34 @@ void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, c assert(pCache); assert(data); - hash = rpcHashConn(pCache, ip, port, user); + hash = rpcHashConn(pCache, ip, port, connType); pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool); pNode->ip = ip; pNode->port = port; + pNode->connType = connType; pNode->data = data; pNode->prev = NULL; pNode->time = time; - pthread_mutex_lock(&pCache->mutex); + rpcLockCache(pCache->lockedBy+hash); pNode->next = pCache->connHashList[hash]; if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode; pCache->connHashList[hash] = pNode; - pCache->total++; pCache->count[hash]++; rpcRemoveExpiredNodes(pCache, pNode->next, hash, time); - pthread_mutex_unlock(&pCache->mutex); - - tTrace("%p ip:0x%x:%hu:%d:%p added into cache, connections:%d", data, ip, port, hash, pNode, pCache->count[hash]); - - return; -} - -void rpcCleanConnCache(void *handle, void *tmrId) { - int hash; - SConnHash * pNode; - SConnCache *pCache; + rpcUnlockCache(pCache->lockedBy+hash); - pCache = (SConnCache *)handle; - if (pCache == NULL || pCache->maxSessions == 0) return; - if (pCache->pTimer != tmrId) return; + pCache->total++; - uint64_t time = taosGetTimestampMs(); + tTrace("%p ip:0x%x:%hu:%d:%d:%p added into cache, connections:%d", data, ip, port, connType, hash, pNode, pCache->count[hash]); - for (hash = 0; hash < pCache->maxSessions; ++hash) { - pthread_mutex_lock(&pCache->mutex); - pNode = pCache->connHashList[hash]; - rpcRemoveExpiredNodes(pCache, pNode, hash, time); - pthread_mutex_unlock(&pCache->mutex); - } - - // tTrace("timer, total connections in cache:%d", pCache->total); - taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer); + return; } -void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) { +void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connType) { int hash; SConnHash * pNode; SConnCache *pCache; @@ -154,8 +162,8 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) uint64_t time = taosGetTimestampMs(); - hash = rpcHashConn(pCache, ip, port, user); - pthread_mutex_lock(&pCache->mutex); + hash = rpcHashConn(pCache, ip, port, connType); + rpcLockCache(pCache->lockedBy+hash); pNode = pCache->connHashList[hash]; while (pNode) { @@ -165,7 +173,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) break; } - if (pNode->ip == ip && pNode->port == port) break; + if (pNode->ip == ip && pNode->port == port && pNode->connType == connType) break; pNode = pNode->next; } @@ -189,71 +197,87 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) pCache->count[hash]--; } - pthread_mutex_unlock(&pCache->mutex); + rpcUnlockCache(pCache->lockedBy+hash); if (pData) { - tTrace("%p ip:0x%x:%hu:%d:%p retrieved from cache, connections:%d", pData, ip, port, hash, pNode, pCache->count[hash]); + tTrace("%p ip:0x%x:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, ip, port, connType, hash, pNode, pCache->count[hash]); } return pData; } -void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) { - SConnHash **connHashList; - mpool_h connHashMemPool; +static void rpcCleanConnCache(void *handle, void *tmrId) { + int hash; + SConnHash * pNode; SConnCache *pCache; - connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash)); - if (connHashMemPool == 0) return NULL; + pCache = (SConnCache *)handle; + if (pCache == NULL || pCache->maxSessions == 0) return; + if (pCache->pTimer != tmrId) return; - connHashList = calloc(sizeof(SConnHash *), maxSessions); - if (connHashList == 0) { - taosMemPoolCleanUp(connHashMemPool); - return NULL; - } + uint64_t time = taosGetTimestampMs(); - pCache = malloc(sizeof(SConnCache)); - if (pCache == NULL) { - taosMemPoolCleanUp(connHashMemPool); - free(connHashList); - return NULL; + for (hash = 0; hash < pCache->maxSessions; ++hash) { + rpcLockCache(pCache->lockedBy+hash); + pNode = pCache->connHashList[hash]; + rpcRemoveExpiredNodes(pCache, pNode, hash, time); + rpcUnlockCache(pCache->lockedBy+hash); } - memset(pCache, 0, sizeof(SConnCache)); - pCache->count = calloc(sizeof(int), maxSessions); - pCache->total = 0; - pCache->keepTimer = keepTimer; - pCache->maxSessions = maxSessions; - pCache->connHashMemPool = connHashMemPool; - pCache->connHashList = connHashList; - pCache->cleanFp = cleanFp; - pCache->tmrCtrl = tmrCtrl; + // tTrace("timer, total connections in cache:%d", pCache->total); taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer); - - pthread_mutex_init(&pCache->mutex, NULL); - - return pCache; } -void rpcCloseConnCache(void *handle) { - SConnCache *pCache; +static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) { + if (pNode == NULL || (time < pCache->keepTimer + pNode->time) ) return; - pCache = (SConnCache *)handle; - if (pCache == NULL || pCache->maxSessions == 0) return; + SConnHash *pPrev = pNode->prev, *pNext; - pthread_mutex_lock(&pCache->mutex); + while (pNode) { + (*pCache->cleanFp)(pNode->data); + pNext = pNode->next; + pCache->total--; + pCache->count[hash]--; + tTrace("%p ip:0x%x:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->ip, pNode->port, pNode->connType, hash, pNode, + pCache->count[hash]); + taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); + pNode = pNext; + } - taosTmrStopA(&(pCache->pTimer)); + if (pPrev) + pPrev->next = NULL; + else + pCache->connHashList[hash] = NULL; +} - if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool); +static int rpcHashConn(void *handle, uint32_t ip, uint16_t port, int8_t connType) { + SConnCache *pCache = (SConnCache *)handle; + int hash = 0; - tfree(pCache->connHashList); - tfree(pCache->count) + hash = ip >> 16; + hash += (unsigned short)(ip & 0xFFFF); + hash += port; + hash += connType; + + hash = hash % pCache->maxSessions; - pthread_mutex_unlock(&pCache->mutex); + return hash; +} - pthread_mutex_destroy(&pCache->mutex); +static void rpcLockCache(int64_t *lockedBy) { + int64_t tid = taosGetPthreadId(); + int i = 0; + while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) { + if (++i % 100 == 0) { + sched_yield(); + } + } +} - memset(pCache, 0, sizeof(SConnCache)); - free(pCache); +static void rpcUnlockCache(int64_t *lockedBy) { + int64_t tid = taosGetPthreadId(); + if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) { + assert(false); + } } + diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 8ce625f6fd..e286af1598 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -41,12 +41,13 @@ #define rpcIsReq(type) (type & 1U) typedef struct { - int sessions; - int numOfThreads; - int idleTime; // milliseconds; + int sessions; // number of sessions allowed + int numOfThreads; // number of threads to process incoming messages + int idleTime; // milliseconds; char localIp[TSDB_IPv4ADDR_LEN]; uint16_t localPort; - int connType; + int8_t connType; + int index; // for UDP server only, round robin for multiple threads char label[12]; char user[TSDB_UNI_LEN]; // meter ID @@ -78,7 +79,7 @@ typedef struct { int32_t contLen; // content length int32_t code; // error code int16_t numOfTry; // number of try for different servers - int8_t oldIndex; // server IP index passed by app + int8_t oldInUse; // server IP inUse passed by app int8_t redirect; // flag to indicate redirect int8_t connType; // connection type char msg[0]; // RpcHead starts from here @@ -115,7 +116,7 @@ typedef struct _RpcConn { char *pReqMsg; // request message including header int reqMsgLen; // request message length SRpcInfo *pRpc; // the associated SRpcInfo - int connType; // connection type + int8_t connType; // connection type int64_t lockedBy; // lock for connection SRpcReqContext *pContext; // request context } SRpcConn; @@ -172,8 +173,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, static void rpcCloseConn(void *thandle); static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext); static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); -static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr); -static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr); +static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv); +static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); @@ -207,8 +208,7 @@ void *rpcOpen(SRpcInit *pInit) { if(pInit->label) strcpy(pRpc->label, pInit->label); pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; - // pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads; - pRpc->numOfThreads = 1; + pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads; if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp); pRpc->localPort = pInit->localPort; pRpc->afp = pInit->afp; @@ -331,7 +331,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in pContext->contLen = contLen; pContext->pCont = pCont; pContext->msgType = type; - pContext->oldIndex = pIpSet->index; + pContext->oldInUse = pIpSet->inUse; pContext->connType = RPC_CONN_UDPC; if (contLen > 16000) pContext->connType = RPC_CONN_TCPC; @@ -381,6 +381,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { pHead->sourceId = pConn->ownId; pHead->destId = pConn->peerId; pHead->uid = 0; + pHead->port = htons(pConn->localPort); pHead->code = htonl(code); memcpy(pHead->user, pConn->user, tListLen(pHead->user)); @@ -514,8 +515,12 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { return pConn; } -static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr) { +static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { SRpcConn *pConn = NULL; + char hashstr[40]; + SRpcHead *pHead = (SRpcHead *)pRecv->msg; + + sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->uid, pHead->sourceId, pRecv->connType); // check if it is already allocated SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); @@ -529,12 +534,12 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr } else { pConn = pRpc->connList + sid; memset(pConn, 0, sizeof(SRpcConn)); - memcpy(pConn->user, user, tListLen(pConn->user)); + memcpy(pConn->user, pHead->user, tListLen(pConn->user)); pConn->pRpc = pRpc; pConn->sid = sid; pConn->tranId = (uint16_t)(rand() & 0xFFFF); pConn->ownId = htonl(pConn->sid); - if (pRpc->afp && (*pRpc->afp)(user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) { + if (pRpc->afp && (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) { tWarn("%s %p, user not there", pRpc->label, pConn); taosFreeId(pRpc->idPool, sid); // sid shall be released terrno = TSDB_CODE_INVALID_USER; @@ -543,25 +548,33 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr } if (pConn) { + if (pRecv->connType == RPC_CONN_UDPS && pRpc->numOfThreads > 1) { + // UDP server, assign to new connection + pRpc->index = (pRpc->index+1) % pRpc->numOfThreads; + pConn->localPort = (pRpc->localPort + pRpc->index); + } + taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); - tTrace("%s %p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->user); + tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", + pRpc->label, pConn, sid, pConn->user, pConn->localPort); } return pConn; } -static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr) { +static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { SRpcConn *pConn = NULL; + SRpcHead *pHead = (SRpcHead *)pRecv->msg; if (sid) { pConn = pRpc->connList + sid; } else { - pConn = rpcAllocateServerConn(pRpc, user, hashstr); + pConn = rpcAllocateServerConn(pRpc, pRecv); } if (pConn) { - if (memcmp(pConn->user, user, tListLen(pConn->user)) != 0) { - tTrace("%s %p, user:%s is not matched, received:%s", pRpc->label, pConn, pConn->user, user); + if (memcmp(pConn->user, pHead->user, tListLen(pConn->user)) != 0) { + tTrace("%s %p, user:%s is not matched, received:%s", pRpc->label, pConn, pConn->user, pHead->user); terrno = TSDB_CODE_MISMATCHED_METER_ID; pConn = NULL; } @@ -575,13 +588,15 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { SRpcInfo *pRpc = pContext->pRpc; SRpcIpSet *pIpSet = &pContext->ipSet; - pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->ip[pIpSet->index], pIpSet->port, pRpc->user); + pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->ip[pIpSet->inUse], pIpSet->port, pContext->connType); if ( pConn == NULL ) { char ipstr[20] = {0}; - tinet_ntoa(ipstr, pIpSet->ip[pIpSet->index]); + tinet_ntoa(ipstr, pIpSet->ip[pIpSet->inUse]); pConn = rpcOpenConn(pRpc, ipstr, pIpSet->port, pContext->connType); - if (pConn) pConn->destIp = pIpSet->ip[pIpSet->index]; - } + if (pConn) pConn->destIp = pIpSet->ip[pIpSet->inUse]; + } else { + tTrace("%s %p, connection is retrieved from cache", pRpc->label, pConn); + } return pConn; } @@ -670,16 +685,16 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_SUCCESS; } -static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { +static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { int32_t sid; SRpcConn *pConn = NULL; - char hashstr[40] = {0}; SRpcHead *pHead = (SRpcHead *)pRecv->msg; sid = htonl(pHead->destId); pHead->code = htonl(pHead->code); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); + pHead->port = htons(pHead->port); if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); @@ -698,8 +713,7 @@ static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { terrno = TSDB_CODE_INVALID_SESSION_ID; return NULL; } - if (sid == 0) sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->uid, pHead->sourceId, pRecv->connType); - pConn = rpcGetConnObj(pRpc, sid, pHead->user, hashstr); + pConn = rpcGetConnObj(pRpc, sid, pRecv); if (pConn == NULL) return NULL; rpcLockConn(pConn); @@ -714,7 +728,7 @@ static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { } if (pRecv->port) pConn->peerPort = pRecv->port; - if (pHead->port) pConn->peerPort = pHead->port; + if (pHead->port) pConn->peerPort = pHead->port; if (pHead->uid) pConn->peerUid = pHead->uid; terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen); @@ -755,12 +769,11 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; SRpcConn *pConn = (SRpcConn *)pRecv->thandle; - int32_t code = 0; tDump(pRecv->msg, pRecv->msgLen); // underlying UDP layer does not know it is server or client - pRecv->connType = pRecv->connType | pRpc->connType; + pRecv->connType = pRecv->connType | pRpc->connType; if (pRecv->ip==0 && pConn) { rpcProcessBrokenLink(pConn); @@ -768,30 +781,31 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { return NULL; } - pConn = rpcProcessHead(pRpc, pRecv); + terrno = 0; + pConn = rpcProcessMsgHead(pRpc, pRecv); if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d", - pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, code, - pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d port:%hu", + pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, + pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); } if (pConn && pRpc->idleTime) { taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); } - if (code != TSDB_CODE_ALREADY_PROCESSED) { - if (code != 0) { // parsing error + if (terrno != TSDB_CODE_ALREADY_PROCESSED) { + if (terrno != 0) { // parsing error if ( rpcIsReq(pHead->msgType) ) { - rpcSendErrorMsgToPeer(pRecv, code); - tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); + rpcSendErrorMsgToPeer(pRecv, terrno); + tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], terrno); } } else { // parsing OK rpcProcessIncomingMsg(pConn, pHead); } } - if ( code != 0 ) free (pRecv->msg); + if ( terrno ) free (pRecv->msg); return pConn; } @@ -811,7 +825,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { int32_t code = pHead->code; SRpcReqContext *pContext = pConn->pContext; pConn->pContext = NULL; - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->user); + // for UDP, port may be changed by server, the port in ipSet shall be used for cache + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pContext->ipSet.port, pConn->connType); if (code == TSDB_CODE_REDIRECT) { pContext->redirect = 1; @@ -820,7 +835,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); rpcSendReqToServer(pRpc, pContext); } else { - if ( pRpc->ufp && (pContext->ipSet.index != pContext->oldIndex || pContext->redirect) ) + if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) ) (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet (*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code); rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg @@ -968,8 +983,8 @@ static void rpcProcessConnError(void *param, void *id) { (*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code); } else { // move to next IP - pContext->ipSet.index++; - pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps; + pContext->ipSet.inUse++; + pContext->ipSet.inUse = pContext->ipSet.inUse % pContext->ipSet.numOfIps; rpcSendReqToServer(pRpc, pContext); } } diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 66655b34a4..181f8a8475 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -51,7 +51,7 @@ void processResponse(char type, void *pCont, int contLen, void *ahandle, int32_t void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { SInfo *pInfo = (SInfo *)handle; - tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->index); + tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse); pInfo->ipSet = *pIpSet; } @@ -92,7 +92,7 @@ int main(int argc, char *argv[]) { // server info ipSet.numOfIps = 1; - ipSet.index = 0; + ipSet.inUse = 0; ipSet.port = 7000; ipSet.ip[0] = inet_addr(serverIp); ipSet.ip[1] = inet_addr("192.168.0.1"); @@ -189,7 +189,7 @@ int main(int argc, char *argv[]) { float usedTime = (endTime - startTime)/1000.0; // mseconds tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads); - tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000*numOfReqs*appThreads/usedTime, msgSize); + tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); taosCloseLog(); diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 60cc3dfc4c..1b5e1b6ee7 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -76,7 +76,7 @@ int main(int argc, char *argv[]) { rpcInit.localPort = atoi(argv[++i]); } else if (strcmp(argv[i], "-i")==0 && i < argc-1) { strcpy(rpcInit.localIp, argv[++i]); - } else if (strcmp(argv[i], "-n")==0 && i < argc-1) { + } else if (strcmp(argv[i], "-t")==0 && i < argc-1) { rpcInit.numOfThreads = atoi(argv[++i]); } else if (strcmp(argv[i], "-m")==0 && i < argc-1) { msgSize = atoi(argv[++i]); @@ -92,7 +92,7 @@ int main(int argc, char *argv[]) { printf("\nusage: %s [options] \n", argv[0]); printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp); printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort); - printf(" [-t threads]: number of threads, default is:%d\n", rpcInit.numOfThreads); + printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads); printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions); printf(" [-m msgSize]: message body size, default is:%d\n", msgSize); printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize); @@ -103,6 +103,7 @@ int main(int argc, char *argv[]) { } } + tsAsyncLog = 0; rpcInit.connType = TAOS_CONN_SERVER; taosInitLog("server.log", 100000, 10); -- GitLab