提交 12687355 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

TD-1632

上级 97a50f17
Subproject commit 567b7b12f3fd2775c718d284beffc8c38dd6c219 Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
...@@ -881,6 +881,20 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -881,6 +881,20 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
pConn->outType = 0; pConn->outType = 0;
pConn->pReqMsg = NULL; pConn->pReqMsg = NULL;
pConn->reqMsgLen = 0; pConn->reqMsgLen = 0;
SRpcReqContext *pContext = pConn->pContext;
if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) {
// if EpSet is not included in the msg, treat it as NOT_READY
pHead->code = TSDB_CODE_RPC_NOT_READY;
} else {
pContext->redirect++;
if (pContext->redirect > TSDB_MAX_REPLICA) {
pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
tWarn("%s, too many redirects, quit", pConn->info);
}
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -950,12 +964,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont ...@@ -950,12 +964,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
*ppContext = pContext; *ppContext = pContext;
pConn->pContext = NULL; pConn->pContext = NULL;
pConn->pReqMsg = NULL;
// for UDP, port may be changed by server, the port in epSet shall be used for cache
if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType);
}
} }
} }
} }
...@@ -1083,9 +1091,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1083,9 +1091,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
rpcMsg.pCont = pHead->content; rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code; rpcMsg.code = pHead->code;
rpcMsg.ahandle = pConn->ahandle;
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcMsg.ahandle = pConn->ahandle;
if (rpcMsg.contLen > 0) { if (rpcMsg.contLen > 0) {
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
rpcAddRef(pRpc); // add the refCount for requests rpcAddRef(pRpc); // add the refCount for requests
...@@ -1103,25 +1111,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1103,25 +1111,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
} else { } else {
// it's a response // it's a response
rpcMsg.handle = pContext; rpcMsg.handle = pContext;
rpcMsg.ahandle = pContext->ahandle;
// for UDP, port may be changed by server, the port in epSet shall be used for cache // for UDP, port may be changed by server, the port in epSet shall be used for cache
if (pHead->code == TSDB_CODE_RPC_TOO_SLOW) { if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType);
} else {
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
if (rpcMsg.contLen < sizeof(SRpcEpSet)) {
// if EpSet is not included in the msg, treat it as NOT_READY
pHead->code = TSDB_CODE_RPC_NOT_READY;
} else {
pContext->redirect++;
if (pContext->redirect > TSDB_MAX_REPLICA) {
pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
tWarn("%s, too many redirects, quit", pConn->info);
}
}
}
if (pHead->code == TSDB_CODE_RPC_REDIRECT) { if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
pContext->numOfTry = 0; pContext->numOfTry = 0;
SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content; SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册