diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 48932154478df3f5ffb2db4398785a8a1a51108d..afd5d3e7ef21c25f3717f983d1f0ec5165fa310d 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -65,7 +65,7 @@ typedef struct { void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); // call back to process notify the ipSet changes - void (*ufp)(void *ahandle, SRpcIpSet ipSet); + void (*ufp)(void *ahandle, SRpcIpSet *pIpSet); // call back to retrieve the client auth info int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey); @@ -75,9 +75,9 @@ void *rpcOpen(SRpcInit *pRpc); void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); -void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle); +void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle); void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen); -void rpcSendRedirectRsp(void *pConn, SRpcIpSet ipSet); +void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); #ifdef __cplusplus diff --git a/src/rpc/src/rpcCache.c b/src/rpc/src/rpcCache.c index 2f8979a15d0ce4cc217845bd154c3b531353788f..6f5a8e9d539a0822a1adf5546a77f3dcd469e9f6 100644 --- a/src/rpc/src/rpcCache.c +++ b/src/rpc/src/rpcCache.c @@ -73,7 +73,7 @@ void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint6 pNext = pNode->next; pCache->total--; pCache->count[hash]--; - tTrace("%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode, + tTrace("%p ip:0x%x:%hu:%d:%p removed from cache, connections:%d", pNode->data, pNode->ip, pNode->port, hash, pNode, pCache->count[hash]); taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); pNode = pNext; @@ -116,7 +116,7 @@ void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, c pthread_mutex_unlock(&pCache->mutex); - tTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pCache->count[hash]); + tTrace("%p ip:0x%x:%hu:%d:%p added into cache, connections:%d", data, ip, port, hash, pNode, pCache->count[hash]); return; } @@ -192,7 +192,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) pthread_mutex_unlock(&pCache->mutex); if (pData) { - tTrace("%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pCache->count[hash]); + tTrace("%p ip:0x%x:%hu:%d:%p retrieved from cache, connections:%d", pData, ip, port, hash, pNode, pCache->count[hash]); } return pData; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 6a1ec786ff65e5440889d822fb07bb3e436cb3ef..fcbf9e406831e4a17bd24f9f32cea95c6c85bd59 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -57,7 +57,7 @@ typedef struct { void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey); - void (*ufp)(void *ahandle, SRpcIpSet ipSet); + void (*ufp)(void *ahandle, SRpcIpSet *pIpSet); void *idPool; // handle to ID pool void *tmrCtrl; // handle to timer @@ -193,11 +193,7 @@ void *rpcOpen(SRpcInit *pInit) { if(pInit->label) strcpy(pRpc->label, pInit->label); pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; - pRpc->numOfThreads = pInit->numOfThreads; - if (pRpc->numOfThreads > TSDB_MAX_RPC_THREADS) { - pRpc->numOfThreads = TSDB_MAX_RPC_THREADS; - } - + pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads; if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp); pRpc->localPort = pInit->localPort; pRpc->afp = pInit->afp; @@ -300,7 +296,7 @@ void rpcFreeCont(void *cont) { free(msg); } -void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int contLen, void *ahandle) { +void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, int contLen, void *ahandle) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -308,11 +304,11 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext->ahandle = ahandle; pContext->pRpc = (SRpcInfo *)shandle; - pContext->ipSet = ipSet; + pContext->ipSet = *pIpSet; pContext->contLen = contLen; pContext->pCont = pCont; pContext->msgType = type; - pContext->oldIndex = ipSet.index; + pContext->oldIndex = pIpSet->index; rpcSendReqToServer(pRpc, pContext); @@ -338,7 +334,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { pthread_mutex_lock(&pRpc->mutex); if ( pConn->inType == 0 || pConn->meterId[0] == 0 ) { - tTrace("%s pConn:%p, connection is already released, rsp wont be sent", pRpc->label, pConn); + tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn); pthread_mutex_lock(&pRpc->mutex); return; } @@ -373,14 +369,14 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { return; } -void rpcSendRedirectRsp(void *thandle, SRpcIpSet ipSet) { +void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) { char *pMsg; int msgLen = sizeof(SRpcIpSet); pMsg = rpcMallocCont(msgLen); if (pMsg == NULL) return; - memcpy(pMsg, &ipSet, sizeof(ipSet)); + memcpy(pMsg, pIpSet, sizeof(SRpcIpSet)); rpcSendResponse(thandle, TSDB_CODE_REDIRECT, pMsg, msgLen); @@ -401,10 +397,10 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) if (taosOpenConn[pRpc->connType]) { pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort); if (pConn->chandle) { - tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label, + tTrace("%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label, pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort); } else { - tError("%s pConn:%p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn, + tError("%s %p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort); terrno = TSDB_CODE_NETWORK_UNAVAIL; rpcCloseConn(pConn); @@ -446,7 +442,7 @@ static void rpcCloseConn(void *thandle) { taosFreeId(pRpc->idPool, pConn->sid); pConn->pContext = NULL; - tTrace("%s pConn:%p, rpc connection is closed", pRpc->label, pConn); + tTrace("%s %p, rpc connection is closed", pRpc->label, pConn); } pthread_mutex_unlock(&pRpc->mutex); @@ -460,8 +456,6 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions); terrno = TSDB_CODE_MAX_SESSIONS; } else { - tTrace("%s sid:%d, ID allocated, used:%d", pRpc->label, sid, taosIdPoolNumOfUsed(pRpc->idPool)); - pConn = pRpc->connList + sid; memset(pConn, 0, sizeof(SRpcConn)); @@ -498,7 +492,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash pConn->tranId = (uint16_t)(rand() & 0xFFFF); pConn->ownId = htonl(pConn->sid); if (pRpc->afp && (*pRpc->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) { - tWarn("%s pConn:%p, meterId not there", pRpc->label, pConn); + tWarn("%s %p, meterId not there", pRpc->label, pConn); taosFreeId(pRpc->idPool, sid); // sid shall be released terrno = TSDB_CODE_INVALID_USER; pConn = NULL; @@ -507,7 +501,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash if (pConn) { taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); - tTrace("%s pConn:%p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->meterId); + tTrace("%s %p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->meterId); } return pConn; @@ -524,7 +518,7 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *has if (pConn) { if (memcmp(pConn->meterId, meterId, tListLen(pConn->meterId)) != 0) { - tTrace("%s pConn:%p, meterId:%s is not matched, received:%s", pRpc->label, pConn, pConn->meterId, meterId); + tTrace("%s %p, meterId:%s is not matched, received:%s", pRpc->label, pConn, pConn->meterId, meterId); terrno = TSDB_CODE_MISMATCHED_METER_ID; pConn = NULL; } @@ -553,7 +547,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { pConn->peerId = pHead->sourceId; } else { if (pConn->peerId != pHead->sourceId) { - tTrace("%s pConn:%p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn, + tTrace("%s %p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn, pConn->peerId, pHead->sourceId); return TSDB_CODE_INVALID_VALUE; } @@ -561,14 +555,14 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { if (pConn->inTranId == pHead->tranId) { if (pConn->inType == pHead->msgType) { - tTrace("%s pConn:%p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); + tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); } else if (pConn->inType == 0) { - tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn, + tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->inTranId); rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response } else { - tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHead->msgType]); + tTrace("%s %p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHead->msgType]); } // do not reply any message @@ -576,7 +570,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { } if (pConn->inType != 0) { - tTrace("%s pConn:%p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn, + tTrace("%s %p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn, pConn->inTranId, pHead->tranId); return TSDB_CODE_LAST_SESSION_NOT_FINISHED; } @@ -613,7 +607,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS || pHead->tcp) { if (pConn->tretry <= tsRpcMaxRetry) { pConn->tretry++; - tTrace("%s pConn:%p, peer is still processing the transaction", pRpc->label, pConn); + tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn); taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer); return TSDB_CODE_ALREADY_PROCESSED; } else { @@ -669,7 +663,7 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int if (pHead->uid) pConn->peerUid = pHead->uid; if (pHead->tcp) { - tTrace("%s pConn:%p, content will be transfered via TCP", pRpc->label, pConn); + tTrace("%s %p, content will be transfered via TCP", pRpc->label, pConn); if (pConn->outType) taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); return TSDB_CODE_ALREADY_PROCESSED; } @@ -690,14 +684,32 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int return code; } +static void rpcProcessBrokenLink(SRpcConn *pConn) { + SRpcInfo *pRpc = pConn->pRpc; + + if (pConn->outType) { + SRpcReqContext *pContext = pConn->pContext; + pContext->code = TSDB_CODE_NETWORK_UNAVAIL; + taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); + } + + rpcCloseConn(pConn); +} + static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle) { SRpcHead *pHead = (SRpcHead *)msg; SRpcInfo *pRpc = (SRpcInfo *)shandle; - SRpcConn *pConn = NULL; + SRpcConn *pConn = (SRpcConn *)thandle; int32_t code = 0; tDump(msg, msgLen); + if (ip==0 && pConn) { + rpcProcessBrokenLink(pConn); + tfree(msg); + return NULL; + } + pthread_mutex_lock(&pRpc->mutex); code = rpcProcessHead(pRpc, &pConn, msg, msgLen, ip); @@ -720,7 +732,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t pthread_mutex_unlock(&pRpc->mutex); if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d", + tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d", pRpc->label, pConn, taosMsg[pHead->msgType], ip, port, code, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } @@ -733,7 +745,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t if (code != 0) { // parsing error if ( rpcIsReq(pHead->msgType) ) { rpcSendErrorMsgToPeer(pRpc, msg, code, ip, port, chandle); - tTrace("%s pConn:%p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); + tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); } } else { // parsing OK rpcProcessIncomingMsg(pConn, pHead); @@ -764,12 +776,13 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { pContext->redirect = 1; pContext->numOfTry = 0; memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); + tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); rpcSendReqToServer(pRpc, pContext); } else { - rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg if ( pRpc->ufp && (pContext->ipSet.index != pContext->oldIndex || pContext->redirect) ) - (*pRpc->ufp)(pContext->ahandle, pContext->ipSet); // notify the update of ipSet + (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet (*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code); + rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg } } } @@ -885,12 +898,12 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { if ( rpcIsReq(pHead->msgType)) { if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d", + tTrace("%s %p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", + tTrace( "%s %p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, (uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } @@ -898,7 +911,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHead, msgLen, pConn->chandle); if (writtenLen != msgLen) { - tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, + tError("%s %p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, msgLen, writtenLen, strerror(errno)); } @@ -928,23 +941,23 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { pthread_mutex_lock(&pRpc->mutex); if (pConn->outType && pConn->meterId[0]) { - tTrace("%s pConn:%p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); + tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); pConn->pTimer = NULL; pConn->retry++; if (pConn->retry < 4) { - tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label, pConn, + tTrace("%s %p, re-send msg:%s to %s:%hu", pRpc->label, pConn, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer<retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { // close the connection - tTrace("%s pConn:%p, failed to send msg:%s to %s:%hu", pRpc->label, pConn, + tTrace("%s %p, failed to send msg:%s to %s:%hu", pRpc->label, pConn, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); reportDisc = 1; } } else { - tTrace("%s pConn:%p, retry timer not processed", pRpc->label, pConn); + tTrace("%s %p, retry timer not processed", pRpc->label, pConn); } pthread_mutex_unlock(&pRpc->mutex); @@ -961,10 +974,10 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { SRpcInfo *pRpc = pConn->pRpc; if (pConn->meterId[0]) { - tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn); + tTrace("%s %p, close the connection since no activity", pRpc->label, pConn); rpcCloseConn(pConn); } else { - tTrace("%s pConn:%p, idle timer:%p not processed", pRpc->label, pConn, tmrId); + tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId); } } @@ -975,11 +988,11 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { pthread_mutex_lock(&pRpc->mutex); if (pConn->inType && pConn->meterId[0]) { - tTrace("%s pConn:%p, progress timer expired, send progress", pRpc->label, pConn); + tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer<retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { - tTrace("%s pConn:%p, progress timer:%p not processed", pRpc->label, pConn, tmrId); + tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId); } pthread_mutex_unlock(&pRpc->mutex); @@ -1124,12 +1137,12 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { delta = (int32_t)htonl(pDigest->timeStamp); delta -= (int32_t)taosGetTimestampSec(); if (abs(delta) > 900) { - tWarn("%s pConn:%p, time diff:%d is too big, msg discarded, timestamp:%d", pRpc->label, pConn, + tWarn("%s %p, time diff:%d is too big, msg discarded, timestamp:%d", pRpc->label, pConn, delta, htonl(pDigest->timeStamp)); code = TSDB_CODE_INVALID_TIME_STAMP; } else { if (rpcAuthenticateMsg((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, (uint8_t *)pConn->secret) < 0) { - tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn); + tError("%s %p, authentication failed, msg discarded", pRpc->label, pConn); code = TSDB_CODE_AUTH_FAILURE; } else { pHead->msgLen -= sizeof(SRpcDigest); @@ -1138,7 +1151,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { } else { // if it is request or response with code 0, msg shall be discarded if (rpcIsReq(pHead->msgType) || (pHead->content[0] == 0)) { - tTrace("%s pConn:%p, auth spi not matched, msg discarded", pRpc->label, pConn); + tTrace("%s %p, auth spi not matched, msg discarded", pRpc->label, pConn); code = TSDB_CODE_AUTH_FAILURE; } } diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 166cf10a91ba9bd5fef39e98a9bbe855939217a5..8092e06d011d54e31c14e83999152c9cce722367 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -17,10 +17,15 @@ #include "os.h" #include "tlog.h" #include "trpc.h" +#include "taoserror.h" #include void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) { - dPrint("response is received, type:%d, contLen:%d code:%x, ahandle:%p", type, contLen, code, ahandle); + dPrint("response is received, type:%d, contLen:%d code:%x:%s", type, contLen, code, tstrerror(code)); +} + +void processUpdate(void *handle, SRpcIpSet *pIpSet) { + dPrint("ip set is changed, index:%d", pIpSet->index); } int32_t main(int32_t argc, char *argv[]) { @@ -35,6 +40,7 @@ int32_t main(int32_t argc, char *argv[]) { rpcInit.label = "APP"; rpcInit.numOfThreads = 1; rpcInit.cfp = processMsg; + rpcInit.ufp = processUpdate; rpcInit.sessions = 1000; rpcInit.connType = TAOS_CONN_UDPC; rpcInit.idleTime = 2000; @@ -52,11 +58,11 @@ int32_t main(int32_t argc, char *argv[]) { ipSet.numOfIps = 2; ipSet.index = 0; ipSet.port = 7000; - ipSet.ip[0] = inet_addr("127.0.0.1"); - ipSet.ip[1] = inet_addr("192.168.0.1"); + ipSet.ip[0] = inet_addr("192.168.0.1"); + ipSet.ip[1] = inet_addr("127.0.0.1"); void *cont = rpcMallocCont(100); - rpcSendRequest(pRpc, ipSet, 1, cont, 100, 1); + rpcSendRequest(pRpc, &ipSet, 1, cont, 100, 1); getchar(); diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 85994a04d0dd471490ba7797e2a1b3ec3e49eacf..d62550592d59a692f15afe1ec26306d2fde793af 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -22,7 +22,17 @@ void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) { dPrint("request is received, type:%d, contLen:%d", type, contLen); void *rsp = rpcMallocCont(128); - rpcSendResponse(ahandle, 1, rsp, 128); + + //rpcSendResponse(ahandle, 1, rsp, 128); + + SRpcIpSet ipSet; + ipSet.numOfIps = 1; + ipSet.index = 0; + ipSet.port = 7000; + ipSet.ip[0] = inet_addr("192.168.0.2"); + + rpcSendRedirectRsp(ahandle, &ipSet); + rpcFreeCont(pCont); }