提交 5536c3c9 编写于 作者: A Alex Duan

feat(rpc): ack connection alive

上级 da484b7a
...@@ -308,7 +308,7 @@ bool sendProbeConnMsg(SSqlObj* pSql) { ...@@ -308,7 +308,7 @@ bool sendProbeConnMsg(SSqlObj* pSql) {
if (diff > killTimeout) { if (diff > killTimeout) {
// need kill query // need kill query
tscDebug("PROBE 0x%"PRIx64" need killed, noAckCnt:%d diff=%d", pSql->self, pSql->noAckCnt, diff); tscDebug("PROBE 0x%"PRIx64" need killed, noAckCnt:%d diff=%d", pSql->self, pSql->noAckCnt, diff);
return false; //return false;
} }
if (pSql->pPrevContext == NULL || pSql->pPrevConn == NULL || pSql->pPrevFdObj == NULL || pSql->prevFd <= 0) { if (pSql->pPrevContext == NULL || pSql->pPrevConn == NULL || pSql->pPrevFdObj == NULL || pSql->prevFd <= 0) {
......
...@@ -57,13 +57,6 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { ...@@ -57,13 +57,6 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_VND_INVALID_VGROUP_ID; int32_t code = TSDB_CODE_VND_INVALID_VGROUP_ID;
char * pCont = pMsg->pCont; char * pCont = pMsg->pCont;
// check probe conn msg
if(pMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = 0, .msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP};
rpcSendResponse(&rpcRsp);
return ;
}
while (leftLen > 0) { while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *)pCont; SMsgHead *pHead = (SMsgHead *)pCont;
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
......
...@@ -993,6 +993,16 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont ...@@ -993,6 +993,16 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
pConn->peerPort = pRecv->port; pConn->peerPort = pRecv->port;
if (pHead->port) pConn->peerPort = htons(pHead->port); if (pHead->port) pConn->peerPort = htons(pHead->port);
// probe msg
if(pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
pConn->inType = pHead->msgType;
rpcSendQuickRsp(pConn, TSDB_CODE_SUCCESS);
rpcUnlockConn(pConn);
rpcFreeMsg(pRecv->msg);
pRecv->msg = NULL;
return pConn;
}
terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen); terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
// code can be transformed only after authentication // code can be transformed only after authentication
...@@ -1710,6 +1720,7 @@ bool doRpcSendProbe(SRpcConn *pConn) { ...@@ -1710,6 +1720,7 @@ bool doRpcSendProbe(SRpcConn *pConn) {
memcpy(pHead->user, pConn->user, tListLen(pHead->user)); memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = htonl(code); pHead->code = htonl(code);
pConn->outType = pHead->msgType;
bool ret = rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead) + sizeof(int32_t)); bool ret = rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead) + sizeof(int32_t));
pConn->secured = 1; // connection shall be secured pConn->secured = 1; // connection shall be secured
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册