diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 87790c5683aa269bfc73f8d1d0721873cb20e036..405f796a5e2e281ee5a63bf3fed7c14426ac30b4 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -124,7 +124,7 @@ typedef struct SRpcConn { } SRpcConn; int tsRpcMaxUdpSize = 15000; // bytes - +int tsProgressTimer = 100; // not configurable int tsRpcMaxRetry; int tsRpcHeadSize; @@ -203,7 +203,8 @@ static void rpcUnlockConn(SRpcConn *pConn); void *rpcOpen(const SRpcInit *pInit) { SRpcInfo *pRpc; - tsRpcMaxRetry = tsRpcMaxTime * 1000 * 2 / tsRpcTimer; + tsProgressTimer = tsRpcTimer/2; + tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer; tsRpcHeadSize = RPC_MSG_OVERHEAD; tsRpcOverhead = sizeof(SRpcReqContext); @@ -419,8 +420,11 @@ void rpcSendResponse(const SRpcMsg *pRsp) { pConn->rspMsgLen = msgLen; if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; + SRpcInfo *pRpc = pConn->pRpc; taosTmrStopA(&pConn->pTimer); - // taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); + + // set the idle timer to monitor the activity + taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); rpcSendMsgToPeer(pConn, msg, msgLen); pConn->secured = 1; // connection shall be secured @@ -682,6 +686,7 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno)); } + pConn->tretry = 0; return pConn; } @@ -747,20 +752,28 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { taosTmrStopA(&pConn->pTimer); pConn->retry = 0; + if (pHead->code == TSDB_CODE_AUTH_REQUIRED && pRpc->spi) { + tTrace("%s, authentication shall be restarted", pConn->info); + pConn->secured = 0; + rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); + pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); + return TSDB_CODE_ALREADY_PROCESSED; + } + if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) { if (pConn->tretry <= tsRpcMaxRetry) { - tTrace("%s, peer is still processing the transaction", pConn->info); + tTrace("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry); pConn->tretry++; rpcSendReqHead(pConn); - taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); + pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); return TSDB_CODE_ALREADY_PROCESSED; } else { // peer still in processing, give up - return TSDB_CODE_TOO_SLOW; + tTrace("%s, server processing takes too long time, give up", pConn->info); + pHead->code = TSDB_CODE_TOO_SLOW; } } - pConn->tretry = 0; pConn->outType = 0; pConn->pReqMsg = NULL; pConn->reqMsgLen = 0; @@ -819,7 +832,9 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { if ( rpcIsReq(pHead->msgType) ) { terrno = rpcProcessReqHead(pConn, pHead); pConn->connType = pRecv->connType; - taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); + + // client shall send the request within tsRpcTime again, put 20 mseconds tolerance + taosTmrReset(rpcProcessIdleTimer, tsRpcTimer+20, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); } else { terrno = rpcProcessRspHead(pConn, pHead); } @@ -934,7 +949,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { if ( rpcIsReq(pHead->msgType) ) { rpcMsg.handle = pConn; - taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); + pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl); (*(pRpc->cfp))(&rpcMsg, NULL); } else { // it's a response @@ -942,14 +957,12 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { rpcMsg.handle = pContext->ahandle; pConn->pContext = NULL; - if (pHead->code == TSDB_CODE_AUTH_REQUIRED) { - pConn->secured = 0; - rpcSendReqToServer(pRpc, pContext); - return; - } - // for UDP, port may be changed by server, the port in ipSet shall be used for cache - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->ipSet.port[pContext->ipSet.inUse], pConn->connType); + if (pHead->code != TSDB_CODE_TOO_SLOW) { + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->ipSet.port[pContext->ipSet.inUse], pConn->connType); + } else { + rpcCloseConn(pConn); + } if (pHead->code == TSDB_CODE_REDIRECT) { pContext->redirect++; @@ -1038,6 +1051,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { pReplyHead->sourceId = pRecvHead->destId; pReplyHead->destId = pRecvHead->sourceId; pReplyHead->linkUid = pRecvHead->linkUid; + pReplyHead->ahandle = pRecvHead->ahandle; pReplyHead->code = htonl(code); msgLen = sizeof(SRpcHead); @@ -1094,8 +1108,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pConn->reqMsgLen = msgLen; pConn->pContext = pContext; - taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); rpcSendMsgToPeer(pConn, msg, msgLen); + taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); rpcUnlockConn(pConn); } @@ -1171,7 +1185,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { if (pConn->retry < 4) { tTrace("%s, re-send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); - taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); + pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); } else { // close the connection tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); @@ -1224,7 +1238,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { if (pConn->inType && pConn->user[0]) { tTrace("%s, progress timer expired, send progress", pConn->info); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); - taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); + pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl); } else { tTrace("%s, progress timer:%p not processed", pConn->info, tmrId); } @@ -1356,15 +1370,17 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){ // secured link, or no authentication pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); + // tTrace("%s, secured link, no auth is required", pConn->info); return 0; } if ( !rpcIsReq(pHead->msgType) ) { // for response, if code is auth failure, it shall bypass the auth process code = htonl(pHead->code); - if (code==TSDB_CODE_INVALID_TIME_STAMP || code==TSDB_CODE_AUTH_FAILURE || + if (code==TSDB_CODE_INVALID_TIME_STAMP || code==TSDB_CODE_AUTH_FAILURE || code == TSDB_CODE_AUTH_REQUIRED || code==TSDB_CODE_INVALID_USER || code == TSDB_CODE_NOT_READY) { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); + // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); return 0; } } @@ -1387,12 +1403,12 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { } else { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client - //tTrace("%s, message is authenticated", pConn->info); + // tTrace("%s, message is authenticated", pConn->info); } } } else { tError("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi); - code = TSDB_CODE_AUTH_FAILURE; + code = pHead->spi ? TSDB_CODE_AUTH_FAILURE : TSDB_CODE_AUTH_REQUIRED; } return code;