提交 7d2d19d6 编写于 作者: A Alex Duan

feat(rpc): proge msg is ok

上级 726850c3
......@@ -530,6 +530,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
assert(pSql->self == handle);
// check msgtype
if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
pSql->noAckCnt = 0;
pSql->lastAlive = taosGetTimestampMs();
tscDebug("PROBE 0x%" PRIx64 " recv probe msg. sql=%s", pSql->self, pSql->sqlstr);
rpcFreeCont(rpcMsg->pCont);
return ;
}
STscObj *pObj = pSql->pTscObj;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
......@@ -546,14 +556,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
return;
}
// check msgtype
if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
pSql->noAckCnt = 0;
pSql->lastAlive = taosGetTimestampMs();
tscDebug("PROBE 0x%" PRIx64 " recv probe msg. sql=%s", pSql->self, pSql->sqlstr);
return ;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
......
......@@ -124,7 +124,9 @@ typedef struct SRpcConn {
int8_t connType; // connection type
int64_t lockedBy; // lock for connection
SRpcReqContext *pContext; // request context
int64_t rid; // probe msg use rid get pContext
int64_t lastLiveTime; // last alive time with ms
} SRpcConn;
int tsRpcMaxUdpSize = 15000; // bytes
......@@ -994,18 +996,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
pConn->peerPort = pRecv->port;
if (pHead->port) pConn->peerPort = htons(pHead->port);
/*
// probe msg
if(pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
pConn->inType = pHead->msgType;
rpcSendQuickRsp(pConn, TSDB_CODE_SUCCESS);
rpcUnlockConn(pConn);
rpcFreeMsg(pRecv->msg);
pRecv->msg = NULL;
return pConn;
}
*/
terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
// code can be transformed only after authentication
......@@ -1090,7 +1080,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
}
// process probe msg , return true is probe msg, false is not probe msg
static bool rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
// response to
......@@ -1104,24 +1094,35 @@ static bool rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
rspHead.code = 0;
rspHead.spi = pHead->spi;
rspHead.linkUid = pHead->linkUid;
rpcLockConn(pConn);
rspHead.sourceId = pConn->ownId;
rspHead.destId = pConn->peerId;
memcpy(rspHead.user, pHead->user, tListLen(pHead->user));
bool ret = rpcSendMsgToPeer(pConn, &rspHead, sizeof(SRpcHead));
tDebug("PROBE 0x%" PRIx64 " recv probe msg and response. ret=%d", pHead->ahandle, ret);
return true;
rpcFreeCont(pRecv->msg);
rpcUnlockConn(pConn);
} else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
if(pConn) {
rpcLockConn(pConn);
pConn->lastLiveTime = taosGetTimestampMs();
rpcProcessIncomingMsg(pConn, pHead, pConn->pContext);
// get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, pConn->rid);
if (pContext) {
rpcProcessIncomingMsg(pConn, pHead, pContext);
taosReleaseRef(tsRpcRefId, pConn->rid);
} else {
tError("PROBE recv probe msg get context is NULL. rid=%" PRId64 " NULL.", pConn->rid);
}
rpcUnlockConn(pConn);
}
tDebug("PROBE 0x%" PRIx64 " recv response probe msg and update lastLiveTime. pConn=%p", pHead->ahandle, pConn);
return false;
} else {
return false;
}
}
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
......@@ -1145,10 +1146,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
// deal probe msg
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN || pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
if (rpcProcessProbeMsg(pRecv, pConn)) {
rpcFreeMsg(pRecv->msg);
return pConn;
}
rpcProcessProbeMsg(pRecv, pConn);
return pConn;
}
if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) {
......@@ -1203,7 +1202,10 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
}
// free the request message
taosRemoveRef(tsRpcRefId, pContext->rid);
if(pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN && pMsg->msgType != TSDB_MSG_TYPE_PROBE_CONN_RSP) {
taosRemoveRef(tsRpcRefId, pContext->rid);
}
}
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
......@@ -1241,6 +1243,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
// it's a response
rpcMsg.handle = pContext;
rpcMsg.ahandle = pContext->ahandle;
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
// probe msg
rpcNotifyClient(pContext, &rpcMsg);
return ;
}
// reset pConn NULL
pContext->pConn = NULL;
// for UDP, port may be changed by server, the port in epSet shall be used for cache
......@@ -1397,6 +1407,8 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pConn->pReqMsg = msg;
pConn->reqMsgLen = msgLen;
pConn->pContext = pContext;
if(pContext)
pConn->rid = pContext->rid;
bool ret = rpcSendMsgToPeer(pConn, msg, msgLen);
if (pConn->connType != RPC_CONN_TCPC)
......@@ -1825,13 +1837,13 @@ _END:
// after sql request send , save conn info
bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd) {
if(rpcRid < 0) {
tError("ACK saveSendInfo rpcRid=%" PRId64 " less than zero, invalid.", rpcRid);
tError("PROBE saveSendInfo rpcRid=%" PRId64 " less than zero, invalid.", rpcRid);
return false;
}
// get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid);
if (pContext == NULL) {
tError("ACK rpcRid=%" PRId64 " get context NULL.", rpcRid);
tError("PROBE rpcRid=%" PRId64 " get context NULL.", rpcRid);
return false;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册