未验证 提交 c87e3406 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3745 from taosdata/patch/TD-1632

TD-1632 revert
...@@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); ...@@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void rpcSendReqHead(SRpcConn *pConn); static void rpcSendReqHead(SRpcConn *pConn);
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext);
static void rpcProcessConnError(void *param, void *id); static void rpcProcessConnError(void *param, void *id);
static void rpcProcessRetryTimer(void *, void *); static void rpcProcessRetryTimer(void *, void *);
static void rpcProcessIdleTimer(void *param, void *tmrId); static void rpcProcessIdleTimer(void *param, void *tmrId);
...@@ -881,17 +881,32 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -881,17 +881,32 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
pConn->outType = 0; pConn->outType = 0;
pConn->pReqMsg = NULL; pConn->pReqMsg = NULL;
pConn->reqMsgLen = 0; pConn->reqMsgLen = 0;
SRpcReqContext *pContext = pConn->pContext;
if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) {
// if EpSet is not included in the msg, treat it as NOT_READY
pHead->code = TSDB_CODE_RPC_NOT_READY;
} else {
pContext->redirect++;
if (pContext->redirect > TSDB_MAX_REPLICA) {
pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
tWarn("%s, too many redirects, quit", pConn->info);
}
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) {
int32_t sid; int32_t sid;
SRpcConn *pConn = NULL; SRpcConn *pConn = NULL;
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
sid = htonl(pHead->destId); sid = htonl(pHead->destId);
*ppContext = NULL;
if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) {
tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
...@@ -945,6 +960,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -945,6 +960,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl);
} else { } else {
terrno = rpcProcessRspHead(pConn, pHead); terrno = rpcProcessRspHead(pConn, pHead);
if (terrno == 0) {
SRpcReqContext *pContext = pConn->pContext;
*ppContext = pContext;
pConn->pContext = NULL;
}
} }
} }
...@@ -1009,7 +1029,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -1009,7 +1029,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
} }
terrno = 0; terrno = 0;
pConn = rpcProcessMsgHead(pRpc, pRecv); SRpcReqContext *pContext;
pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) {
tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
...@@ -1029,7 +1050,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -1029,7 +1050,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
} }
} else { // msg is passed to app only parsing is ok } else { // msg is passed to app only parsing is ok
rpcProcessIncomingMsg(pConn, pHead); rpcProcessIncomingMsg(pConn, pHead, pContext);
} }
} }
...@@ -1060,7 +1081,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { ...@@ -1060,7 +1081,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
rpcFreeCont(pContext->pCont); rpcFreeCont(pContext->pCont);
} }
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -1070,9 +1091,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -1070,9 +1091,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcMsg.pCont = pHead->content; rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code; rpcMsg.code = pHead->code;
rpcMsg.ahandle = pConn->ahandle;
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcMsg.ahandle = pConn->ahandle;
if (rpcMsg.contLen > 0) { if (rpcMsg.contLen > 0) {
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
rpcAddRef(pRpc); // add the refCount for requests rpcAddRef(pRpc); // add the refCount for requests
...@@ -1089,10 +1110,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -1089,10 +1110,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
} }
} else { } else {
// it's a response // it's a response
SRpcReqContext *pContext = pConn->pContext;
rpcMsg.handle = pContext; rpcMsg.handle = pContext;
pConn->pContext = NULL; rpcMsg.ahandle = pContext->ahandle;
pConn->pReqMsg = NULL;
// for UDP, port may be changed by server, the port in epSet shall be used for cache // for UDP, port may be changed by server, the port in epSet shall be used for cache
if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
...@@ -1101,19 +1120,6 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -1101,19 +1120,6 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
if (rpcMsg.contLen < sizeof(SRpcEpSet)) {
// if EpSet is not included in the msg, treat it as NOT_READY
pHead->code = TSDB_CODE_RPC_NOT_READY;
} else {
pContext->redirect++;
if (pContext->redirect > TSDB_MAX_REPLICA) {
pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
tWarn("%s, too many redirects, quit", pConn->info);
}
}
}
if (pHead->code == TSDB_CODE_RPC_REDIRECT) { if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
pContext->numOfTry = 0; pContext->numOfTry = 0;
SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content; SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册