diff --git a/src/inc/trpc.h b/src/inc/trpc.h index afd5d3e7ef21c25f3717f983d1f0ec5165fa310d..0e4e43f76aad0e802841b10cc5a3fd745bf74de3 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -45,6 +45,12 @@ typedef struct { uint32_t ip[TSDB_MAX_MPEERS]; } SRpcIpSet; +typedef struct { + uint32_t sourceIp; + uint16_t sourcePort; + char *user; +} SRpcConnInfo; + typedef struct { char *localIp; // local IP used uint16_t localPort; // local port @@ -55,7 +61,7 @@ typedef struct { int idleTime; // milliseconds, 0 means idle timer is disabled // the following is for client security only - char *meterId; // meter ID + char *user; // user name char spi; // security parameter index char encrypt; // encrypt algorithm char *secret; // key for authentication @@ -78,7 +84,7 @@ void rpcFreeCont(void *pCont); void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle); void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen); void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); - +void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); #ifdef __cplusplus } diff --git a/src/rpc/inc/rpcHead.h b/src/rpc/inc/rpcHead.h index bfdcfb0beeae011215416ec08075d832dcd4880a..3f7efa9f46f96d97bc95281a3d0176fa76dec7e1 100644 --- a/src/rpc/inc/rpcHead.h +++ b/src/rpc/inc/rpcHead.h @@ -32,7 +32,7 @@ typedef struct { uint32_t uid; // for unique ID inside a client uint32_t sourceId; // source ID, an index for connection list uint32_t destId; // destination ID, an index for connection list - char meterId[TSDB_UNI_LEN]; + char user[TSDB_UNI_LEN]; uint16_t port; // for UDP only, port may be changed char empty[1]; // reserved uint8_t msgType; // message type diff --git a/src/rpc/src/rpcClient.c b/src/rpc/src/rpcClient.c index f600004266a1d5edc10695d7b6aec6f9d54908fd..8f6f1f9c9b567216ebc6700c1048b4f4c2288073 100644 --- a/src/rpc/src/rpcClient.c +++ b/src/rpc/src/rpcClient.c @@ -72,7 +72,8 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { pTcp->numOfFds--; - if (pTcp->numOfFds < 0) tError("%s number of TCP FDs shall never be negative", pTcp->label); + if (pTcp->numOfFds < 0) + tError("%s number of TCP FDs shall never be negative, FD:%p", pTcp->label, pFdObj); // remove from the FdObject list @@ -91,7 +92,7 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { // notify the upper layer to clean the associated context if (pFdObj->thandle) (*(pTcp->processData))(NULL, 0, 0, 0, pTcp->shandle, pFdObj->thandle, NULL); - tTrace("%s TCP FD is cleaned up, numOfFds:%d", pTcp->label, pTcp->numOfFds); + tTrace("%s TCP is cleaned up, FD:%p numOfFds:%d", pTcp->label, pFdObj, pTcp->numOfFds); memset(pFdObj, 0, sizeof(STcpFd)); @@ -302,7 +303,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16 pthread_mutex_unlock(&(pTcp->mutex)); - tTrace("%s TCP connection to ip:%s port:%hu is created, numOfFds:%d", pTcp->label, ip, port, pTcp->numOfFds); + tTrace("%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d", pTcp->label, ip, port, pFdObj, pTcp->numOfFds); return pFdObj; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index fcbf9e406831e4a17bd24f9f32cea95c6c85bd59..32e41ccfdd86e9200298c67651e5c73df7a156ff 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -49,14 +49,14 @@ typedef struct { int connType; char label[12]; - char meterId[TSDB_UNI_LEN]; // meter ID + char user[TSDB_UNI_LEN]; // meter ID char spi; // security parameter index char encrypt; // encrypt algorithm char secret[TSDB_KEY_LEN]; // secret for the link char ckey[TSDB_KEY_LEN]; // ciphering key void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); - int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey); + int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); void (*ufp)(void *ahandle, SRpcIpSet *pIpSet); void *idPool; // handle to ID pool @@ -86,7 +86,7 @@ typedef struct _RpcConn { int sid; // session ID uint32_t ownId; // own link ID uint32_t peerId; // peer link ID - char meterId[TSDB_UNI_LEN]; // user ID for the link + char user[TSDB_UNI_LEN]; // user ID for the link char spi; // security parameter index char encrypt; // encryption, 0:1 char secret[TSDB_KEY_LEN]; // secret for the link @@ -95,7 +95,7 @@ typedef struct _RpcConn { uint32_t peerUid; // peer UID uint32_t peerIp; // peer IP uint16_t peerPort; // peer port - char peerIpstr[20]; // peer IP string + char peerIpstr[TSDB_IPv4ADDR_LEN]; // peer IP string uint16_t tranId; // outgoing transcation ID, for build message uint16_t outTranId; // outgoing transcation ID uint16_t inTranId; // transcation ID for incoming msg @@ -160,8 +160,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) static void rpcCloseConn(void *thandle); static SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet); static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); -static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr); -static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr); +static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr); +static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr); static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); @@ -198,7 +198,7 @@ void *rpcOpen(SRpcInit *pInit) { pRpc->localPort = pInit->localPort; pRpc->afp = pInit->afp; pRpc->sessions = pInit->sessions; - if (pInit->meterId) strcpy(pRpc->meterId, pInit->meterId); + if (pInit->user) strcpy(pRpc->user, pInit->user); if (pInit->secret) strcpy(pRpc->secret, pInit->secret); if (pInit->ckey) strcpy(pRpc->ckey, pInit->ckey); pRpc->spi = pInit->spi; @@ -263,7 +263,7 @@ void rpcClose(void *param) { (*taosCleanUpConn[pRpc->connType])(pRpc->shandle); for (int i = 0; i < pRpc->sessions; ++i) { - if (pRpc->connList[i].meterId[0]) { + if (pRpc->connList[i].user[0]) { rpcCloseConn((void *)(pRpc->connList + i)); } } @@ -292,8 +292,10 @@ void *rpcMallocCont(int size) { } void rpcFreeCont(void *cont) { - char *msg = ((char *)cont) - sizeof(SRpcHead); - free(msg); + if ( cont ) { + char *msg = ((char *)cont) - sizeof(SRpcHead); + free(msg); + } } void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, int contLen, void *ahandle) { @@ -333,7 +335,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { pthread_mutex_lock(&pRpc->mutex); - if ( pConn->inType == 0 || pConn->meterId[0] == 0 ) { + if ( pConn->inType == 0 || pConn->user[0] == 0 ) { tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn); pthread_mutex_lock(&pRpc->mutex); return; @@ -350,7 +352,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { pHead->destId = pConn->peerId; pHead->uid = 0; pHead->code = htonl(code); - memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId)); + memcpy(pHead->user, pConn->user, tListLen(pHead->user)); // set pConn parameters pConn->inType = 0; @@ -383,6 +385,15 @@ void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) { return; } +void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { + SRpcConn *pConn = (SRpcConn *)thandle; + SRpcInfo *pRpc = pConn->pRpc; + + pInfo->sourceIp = pConn->peerIp; + pInfo->sourcePort = pConn->peerPort; + strcpy(pInfo->user, pConn->user); +} + static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) { SRpcConn *pConn; @@ -392,16 +403,16 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) strcpy(pConn->peerIpstr, peerIpStr); pConn->peerIp = inet_addr(peerIpStr); pConn->peerPort = peerPort; - strcpy(pConn->meterId, pRpc->meterId); + strcpy(pConn->user, pRpc->user); if (taosOpenConn[pRpc->connType]) { pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort); if (pConn->chandle) { tTrace("%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label, - pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort); + pConn, pConn->sid, pRpc->user, pConn->peerIpstr, pConn->peerPort, pConn->localPort); } else { - tError("%s %p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn, - pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort); + tError("%s %p, failed to set up connection to ip:%s:%hu", pRpc->label, pConn, + pConn->peerIpstr, pConn->peerPort); terrno = TSDB_CODE_NETWORK_UNAVAIL; rpcCloseConn(pConn); pConn = NULL; @@ -418,14 +429,14 @@ static void rpcCloseConn(void *thandle) { pthread_mutex_lock(&pRpc->mutex); - if (pConn->meterId[0]) { - pConn->meterId[0] = 0; + if (pConn->user[0]) { + pConn->user[0] = 0; if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle); taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pIdleTimer); - if ( pRpc->connType == TAOS_CONN_UDPS || TAOS_CONN_TCPS) { + if ( pRpc->connType == TAOS_CONN_UDPS || pRpc->connType == TAOS_CONN_TCPS) { char hashstr[40] = {0}; sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId); taosDeleteStrHash(pRpc->hash, hashstr); @@ -471,7 +482,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { return pConn; } -static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr) { +static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr) { SRpcConn *pConn = NULL; // check if it is already allocated @@ -486,13 +497,13 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash } else { pConn = pRpc->connList + sid; memset(pConn, 0, sizeof(SRpcConn)); - memcpy(pConn->meterId, meterId, tListLen(pConn->meterId)); + memcpy(pConn->user, user, tListLen(pConn->user)); pConn->pRpc = pRpc; pConn->sid = sid; pConn->tranId = (uint16_t)(rand() & 0xFFFF); pConn->ownId = htonl(pConn->sid); - if (pRpc->afp && (*pRpc->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) { - tWarn("%s %p, meterId not there", pRpc->label, pConn); + if (pRpc->afp && (*pRpc->afp)(user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) { + tWarn("%s %p, user not there", pRpc->label, pConn); taosFreeId(pRpc->idPool, sid); // sid shall be released terrno = TSDB_CODE_INVALID_USER; pConn = NULL; @@ -501,24 +512,24 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash if (pConn) { taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); - tTrace("%s %p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->meterId); + tTrace("%s %p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->user); } return pConn; } -static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr) { +static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr) { SRpcConn *pConn = NULL; if (sid) { pConn = pRpc->connList + sid; } else { - pConn = rpcAllocateServerConn(pRpc, meterId, hashstr); + pConn = rpcAllocateServerConn(pRpc, user, hashstr); } if (pConn) { - if (memcmp(pConn->meterId, meterId, tListLen(pConn->meterId)) != 0) { - tTrace("%s %p, meterId:%s is not matched, received:%s", pRpc->label, pConn, pConn->meterId, meterId); + if (memcmp(pConn->user, user, tListLen(pConn->user)) != 0) { + tTrace("%s %p, user:%s is not matched, received:%s", pRpc->label, pConn, pConn->user, user); terrno = TSDB_CODE_MISMATCHED_METER_ID; pConn = NULL; } @@ -530,7 +541,7 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *has SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) { SRpcConn *pConn; - pConn = rpcGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId); + pConn = rpcGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->user); if ( pConn == NULL ) { char ipstr[20] = {0}; tinet_ntoa(ipstr, ipSet.ip[ipSet.index]); @@ -654,7 +665,7 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int } if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHead->uid, pHead->sourceId); - pConn = rpcGetConnObj(pRpc, sid, pHead->meterId, hashstr); + pConn = rpcGetConnObj(pRpc, sid, pHead->user, hashstr); if (pConn == NULL ) return terrno; *ppConn = pConn; @@ -687,6 +698,9 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int static void rpcProcessBrokenLink(SRpcConn *pConn) { SRpcInfo *pRpc = pConn->pRpc; + tTrace("%s %p, link is broken", pRpc->label, pConn); + pConn->chandle = NULL; + if (pConn->outType) { SRpcReqContext *pContext = pConn->pContext; pContext->code = TSDB_CODE_NETWORK_UNAVAIL; @@ -770,7 +784,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { int32_t code = pHead->code; SRpcReqContext *pContext = pConn->pContext; pConn->pContext = NULL; - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId); + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->user); if (code == TSDB_CODE_REDIRECT) { pContext->redirect = 1; @@ -803,7 +817,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { pHead->sourceId = pConn->ownId; pHead->destId = pConn->peerId; pHead->uid = 0; - memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId)); + memcpy(pHead->user, pConn->user, tListLen(pHead->user)); pHead->code = htonl(code); rpcSendMsgToPeer(pConn, msg, 0); @@ -827,7 +841,7 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint pReplyHead->tranId = pRecvHead->tranId; pReplyHead->sourceId = 0; pReplyHead->destId = pRecvHead->sourceId; - memcpy(pReplyHead->meterId, pRecvHead->meterId, tListLen(pReplyHead->meterId)); + memcpy(pReplyHead->user, pRecvHead->user, tListLen(pReplyHead->user)); pReplyHead->code = htonl(code); msgLen = sizeof(SRpcHead); @@ -874,7 +888,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pHead->destId = pConn->peerId; pHead->port = 0; pHead->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid()); - memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId)); + memcpy(pHead->user, pConn->user, tListLen(pHead->user)); // set the connection parameters pConn->outType = msgType; @@ -886,7 +900,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pthread_mutex_unlock(&pRpc->mutex); rpcSendMsgToPeer(pConn, msg, msgLen); - taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); + //taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); } static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { @@ -921,6 +935,8 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { static void rpcProcessConnError(void *param, void *id) { SRpcReqContext *pContext = (SRpcReqContext *)param; SRpcInfo *pRpc = pContext->pRpc; + + tTrace("%s connection error happens", pRpc->label); if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) { rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg @@ -940,7 +956,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { pthread_mutex_lock(&pRpc->mutex); - if (pConn->outType && pConn->meterId[0]) { + if (pConn->outType && pConn->user[0]) { tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); pConn->pTimer = NULL; pConn->retry++; @@ -962,8 +978,8 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { pthread_mutex_unlock(&pRpc->mutex); - pConn->pContext->code = TSDB_CODE_NETWORK_UNAVAIL; - if (reportDisc) { + if (reportDisc && pConn->pContext) { + pConn->pContext->code = TSDB_CODE_NETWORK_UNAVAIL; rpcProcessConnError(pConn->pContext, NULL); rpcCloseConn(pConn); } @@ -973,7 +989,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; SRpcInfo *pRpc = pConn->pRpc; - if (pConn->meterId[0]) { + if (pConn->user[0]) { tTrace("%s %p, close the connection since no activity", pRpc->label, pConn); rpcCloseConn(pConn); } else { @@ -987,7 +1003,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { pthread_mutex_lock(&pRpc->mutex); - if (pConn->inType && pConn->meterId[0]) { + if (pConn->inType && pConn->user[0]) { tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer<retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); @@ -1015,7 +1031,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { char *buf = malloc (contLen + overhead + 8); // 8 extra bytes if (buf == NULL) { - tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno)); + tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen); return contLen; } @@ -1033,7 +1049,6 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { pHead->comp = 1; tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); - finalLen = compLen + overhead; } else { finalLen = contLen; @@ -1055,20 +1070,20 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { int contLen = htonl(pComp->contLen); // prepare the temporary buffer to decompress message - char *buf = rpcMallocCont(contLen); + pNewHead = (SRpcHead *)malloc(contLen + RPC_MSG_OVERHEAD); - if (buf) { - pNewHead = rpcHeadFromCont(buf); + if (pNewHead) { int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; - int32_t originalLen = LZ4_decompress_safe((const char*)(pCont + overhead), buf, compLen, contLen); - assert(originalLen == contLen); + int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char *)pNewHead->content, compLen, contLen); + assert(origLen == contLen); memcpy(pNewHead, pHead, sizeof(SRpcHead)); - pNewHead->msgLen = rpcMsgLenFromCont(originalLen); + pNewHead->msgLen = rpcMsgLenFromCont(origLen); free(pHead); // free the compressed message buffer pHead = pNewHead; + tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); } else { - tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno)); + tError("failed to allocate memory to decompress msg, contLen:%d", contLen); } } diff --git a/src/rpc/src/rpcServer.c b/src/rpc/src/rpcServer.c index 49992e5931a211f4ae075dcc2645ab9fc8f20883..c11a803f1be4b9f593d0d41b1199aa1eb2f1c4c1 100644 --- a/src/rpc/src/rpcServer.c +++ b/src/rpc/src/rpcServer.c @@ -101,8 +101,8 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) { // notify the upper layer, so it will clean the associated context if (pFdObj->thandle) (*(pThreadObj->processData))(NULL, 0, 0, 0, pThreadObj->shandle, pFdObj->thandle, NULL); - tTrace("%s TCP thread:%d, FD is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId, - pThreadObj->numOfFds); + tTrace("%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId, + pFdObj, pThreadObj->numOfFds); memset(pFdObj, 0, sizeof(SFdObj)); @@ -292,8 +292,8 @@ void taosAcceptTcpConnection(void *arg) { pthread_mutex_unlock(&(pThreadObj->threadMutex)); - tTrace("%s TCP thread:%d, a new connection, ip:%s port:%hu, numOfFds:%d", pServerObj->label, pThreadObj->threadId, - pFdObj->ipstr, pFdObj->port, pThreadObj->numOfFds); + tTrace("%s TCP thread:%d, a new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, + pThreadObj->threadId, pFdObj->ipstr, pFdObj->port, pFdObj, pThreadObj->numOfFds); // pick up next thread for next connection threadId++; diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 8092e06d011d54e31c14e83999152c9cce722367..46d9e99c62e87858288e93c525f5fa1ebc4f7cbb 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -13,58 +13,189 @@ * along with this program. If not, see . */ -//#define _DEFAULT_SOURCE +#include +#include +#include +#include +#include +#include +#include #include "os.h" #include "tlog.h" #include "trpc.h" #include "taoserror.h" #include +#include -void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) { - dPrint("response is received, type:%d, contLen:%d code:%x:%s", type, contLen, code, tstrerror(code)); +typedef struct { + int index; + SRpcIpSet ipSet; + int num; + int numOfReqs; + int msgSize; + sem_t rspSem; + sem_t *pOverSem; + pthread_t thread; + void *pRpc; +} SInfo; + +void processResponse(char type, void *pCont, int contLen, void *ahandle, int32_t code) { + SInfo *pInfo = (SInfo *)ahandle; + tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, type, contLen, code); + + if (pCont) rpcFreeCont(pCont); + + sem_post(&pInfo->rspSem); } -void processUpdate(void *handle, SRpcIpSet *pIpSet) { - dPrint("ip set is changed, index:%d", pIpSet->index); +void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { + SInfo *pInfo = (SInfo *)handle; + + tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->index); + pInfo->ipSet = *pIpSet; } -int32_t main(int32_t argc, char *argv[]) { +int tcount = 0; - taosInitLog("client.log", 100000, 10); - dPrint("unit test for rpc module"); +void *sendRequest(void *param) { + SInfo *pInfo = (SInfo *)param; + char *cont; + + tTrace("thread:%d, start to send request", pInfo->index); + + while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { + pInfo->num++; + cont = rpcMallocCont(pInfo->msgSize); + tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); + rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, 1, cont, pInfo->msgSize, pInfo); + if ( pInfo->num % 20000 == 0 ) + tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); + sem_wait(&pInfo->rspSem); + } - SRpcInit rpcInit; + tTrace("thread:%d, it is over", pInfo->index); + tcount++; + + return NULL; +} + +int main(int argc, char *argv[]) { + SRpcInit rpcInit; + SRpcIpSet ipSet; + int msgSize = 128; + int numOfReqs = 0; + int appThreads = 1; + char socketType[20] = "udp"; + char serverIp[40] = "127.0.0.1"; + struct timeval systemTime; + int64_t startTime, endTime; + pthread_attr_t thattr; + + // server info + ipSet.numOfIps = 1; + ipSet.index = 0; + ipSet.port = 7000; + ipSet.ip[0] = inet_addr(serverIp); + ipSet.ip[1] = inet_addr("192.168.0.1"); + + // client info memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = "0.0.0.0"; rpcInit.localPort = 0; rpcInit.label = "APP"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processMsg; - rpcInit.ufp = processUpdate; - rpcInit.sessions = 1000; - rpcInit.connType = TAOS_CONN_UDPC; + rpcInit.cfp = processResponse; + rpcInit.ufp = processUpdateIpSet; + rpcInit.sessions = 100; rpcInit.idleTime = 2000; - rpcInit.meterId = "jefftao"; - rpcInit.secret = "password"; + rpcInit.user = "michael"; + rpcInit.secret = "mypassword"; rpcInit.ckey = "key"; + for (int i=1; iindex = i; + pInfo->ipSet = ipSet; + pInfo->numOfReqs = numOfReqs; + pInfo->msgSize = msgSize; + sem_init(&pInfo->rspSem, 0, 0); + pInfo->pRpc = pRpc; + pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); + pInfo++; + } + + do { + usleep(1); + } while ( tcount < appThreads); + + gettimeofday(&systemTime, NULL); + endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; + float usedTime = (endTime - startTime)/1000.0; // mseconds - void *cont = rpcMallocCont(100); - rpcSendRequest(pRpc, &ipSet, 1, cont, 100, 1); + tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads); + tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000*numOfReqs*appThreads/usedTime, msgSize); - getchar(); + taosCloseLog(); return 0; } diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index d62550592d59a692f15afe1ec26306d2fde793af..f0eb2180750874767789785a8bb9e93549efcefb 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -19,12 +19,33 @@ #include "trpc.h" #include -void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) { - dPrint("request is received, type:%d, contLen:%d", type, contLen); - void *rsp = rpcMallocCont(128); +int msgSize = 128; +int commit = 0; +int dataFd = -1; + +void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) { + static int num = 0; + tTrace("request is received, type:%d, contLen:%d", type, contLen); + + if (dataFd >=0) + write(dataFd, pCont, contLen); + + if (commit >=2) { + ++num; + if ( fsync(dataFd) < 0 ) { + tPrint("failed to flush data to file, reason:%s", strerror(errno)); + } + + if (num % 10000 == 0) { + tPrint("%d request have been written into disk", num); + } + } + + void *rsp = rpcMallocCont(msgSize); - //rpcSendResponse(ahandle, 1, rsp, 128); + rpcSendResponse(thandle, 1, rsp, msgSize); +/* SRpcIpSet ipSet; ipSet.numOfIps = 1; ipSet.index = 0; @@ -32,46 +53,87 @@ void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code ipSet.ip[0] = inet_addr("192.168.0.2"); rpcSendRedirectRsp(ahandle, &ipSet); +*/ rpcFreeCont(pCont); } -int32_t main(int32_t argc, char *argv[]) { - taosInitLog("server.log", 100000, 10); - - dPrint("unit test for rpc module"); - +int main(int argc, char *argv[]) { SRpcInit rpcInit; + char socketType[20] = "udp"; + char dataName[20] = "server.data"; + memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = "0.0.0.0"; rpcInit.localPort = 7000; - rpcInit.label = "APP"; + rpcInit.label = "SER"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processMsg; + rpcInit.cfp = processRequestMsg; rpcInit.sessions = 1000; - rpcInit.connType = TAOS_CONN_UDPS; rpcInit.idleTime = 2000; - rpcInit.meterId = "jefftao"; - rpcInit.secret = "password"; - rpcInit.ckey = "key"; + + for (int i=1; i= 0) { + close(dataFd); + remove(dataName); + } return 0; }