diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index fa46c3a0f35aaa8076815bf6f7de6d8f802f590b..405f796a5e2e281ee5a63bf3fed7c14426ac30b4 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -124,8 +124,7 @@ typedef struct SRpcConn { } SRpcConn; int tsRpcMaxUdpSize = 15000; // bytes -int tsRpcProgressTime = 10; // milliseocnds - +int tsProgressTimer = 100; // not configurable int tsRpcMaxRetry; int tsRpcHeadSize; @@ -204,7 +203,8 @@ static void rpcUnlockConn(SRpcConn *pConn); void *rpcOpen(const SRpcInit *pInit) { SRpcInfo *pRpc; - tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime; + tsProgressTimer = tsRpcTimer/2; + tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer; tsRpcHeadSize = RPC_MSG_OVERHEAD; tsRpcOverhead = sizeof(SRpcReqContext); @@ -420,8 +420,11 @@ void rpcSendResponse(const SRpcMsg *pRsp) { pConn->rspMsgLen = msgLen; if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; + SRpcInfo *pRpc = pConn->pRpc; taosTmrStopA(&pConn->pTimer); - // taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); + + // set the idle timer to monitor the activity + taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); rpcSendMsgToPeer(pConn, msg, msgLen); pConn->secured = 1; // connection shall be secured @@ -683,6 +686,7 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno)); } + pConn->tretry = 0; return pConn; } @@ -748,20 +752,28 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { taosTmrStopA(&pConn->pTimer); pConn->retry = 0; + if (pHead->code == TSDB_CODE_AUTH_REQUIRED && pRpc->spi) { + tTrace("%s, authentication shall be restarted", pConn->info); + pConn->secured = 0; + rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); + pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); + return TSDB_CODE_ALREADY_PROCESSED; + } + if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) { if (pConn->tretry <= tsRpcMaxRetry) { - tTrace("%s, peer is still processing the transaction", pConn->info); + tTrace("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry); pConn->tretry++; rpcSendReqHead(pConn); - taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); + pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); return TSDB_CODE_ALREADY_PROCESSED; } else { // peer still in processing, give up - return TSDB_CODE_TOO_SLOW; + tTrace("%s, server processing takes too long time, give up", pConn->info); + pHead->code = TSDB_CODE_TOO_SLOW; } } - pConn->tretry = 0; pConn->outType = 0; pConn->pReqMsg = NULL; pConn->reqMsgLen = 0; @@ -820,7 +832,9 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { if ( rpcIsReq(pHead->msgType) ) { terrno = rpcProcessReqHead(pConn, pHead); pConn->connType = pRecv->connType; - taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); + + // client shall send the request within tsRpcTime again, put 20 mseconds tolerance + taosTmrReset(rpcProcessIdleTimer, tsRpcTimer+20, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); } else { terrno = rpcProcessRspHead(pConn, pHead); } @@ -935,7 +949,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { if ( rpcIsReq(pHead->msgType) ) { rpcMsg.handle = pConn; - taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); + pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl); (*(pRpc->cfp))(&rpcMsg, NULL); } else { // it's a response @@ -943,14 +957,12 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { rpcMsg.handle = pContext->ahandle; pConn->pContext = NULL; - if (pHead->code == TSDB_CODE_AUTH_REQUIRED) { - pConn->secured = 0; - rpcSendReqToServer(pRpc, pContext); - return; - } - // for UDP, port may be changed by server, the port in ipSet shall be used for cache - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->ipSet.port[pContext->ipSet.inUse], pConn->connType); + if (pHead->code != TSDB_CODE_TOO_SLOW) { + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->ipSet.port[pContext->ipSet.inUse], pConn->connType); + } else { + rpcCloseConn(pConn); + } if (pHead->code == TSDB_CODE_REDIRECT) { pContext->redirect++; @@ -1039,6 +1051,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { pReplyHead->sourceId = pRecvHead->destId; pReplyHead->destId = pRecvHead->sourceId; pReplyHead->linkUid = pRecvHead->linkUid; + pReplyHead->ahandle = pRecvHead->ahandle; pReplyHead->code = htonl(code); msgLen = sizeof(SRpcHead); @@ -1095,8 +1108,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pConn->reqMsgLen = msgLen; pConn->pContext = pContext; - taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); rpcSendMsgToPeer(pConn, msg, msgLen); + taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); rpcUnlockConn(pConn); } @@ -1172,7 +1185,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { if (pConn->retry < 4) { tTrace("%s, re-send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); - taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); + pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); } else { // close the connection tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); @@ -1225,7 +1238,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { if (pConn->inType && pConn->user[0]) { tTrace("%s, progress timer expired, send progress", pConn->info); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); - taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); + pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl); } else { tTrace("%s, progress timer:%p not processed", pConn->info, tmrId); } @@ -1357,15 +1370,17 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){ // secured link, or no authentication pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); + // tTrace("%s, secured link, no auth is required", pConn->info); return 0; } if ( !rpcIsReq(pHead->msgType) ) { // for response, if code is auth failure, it shall bypass the auth process code = htonl(pHead->code); - if (code==TSDB_CODE_INVALID_TIME_STAMP || code==TSDB_CODE_AUTH_FAILURE || + if (code==TSDB_CODE_INVALID_TIME_STAMP || code==TSDB_CODE_AUTH_FAILURE || code == TSDB_CODE_AUTH_REQUIRED || code==TSDB_CODE_INVALID_USER || code == TSDB_CODE_NOT_READY) { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); + // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); return 0; } } @@ -1388,12 +1403,12 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { } else { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client - //tTrace("%s, message is authenticated", pConn->info); + // tTrace("%s, message is authenticated", pConn->info); } } } else { tError("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi); - code = TSDB_CODE_AUTH_FAILURE; + code = pHead->spi ? TSDB_CODE_AUTH_FAILURE : TSDB_CODE_AUTH_REQUIRED; } return code; diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index 68db574d82f3e668ced4b8895c3427640641079e..55e7da3258c403934b7609fc71b05010070b41f4 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -342,6 +342,7 @@ static void taosTimerLoopFunc(int signo) { int64_t now = taosGetTimestampMs(); for (int i = 0; i < tListLen(wheels); i++) { + tmrTrace("begin processing wheel %d", i); // `expried` is a temporary expire list. // expired timers are first add to this list, then move // to expired queue as a batch to improve performance. @@ -389,6 +390,7 @@ static void taosTimerLoopFunc(int signo) { } addToExpired(expired); + tmrTrace("end processing wheel %d", i); } }