From 6ee9d124e6a2896c18e97609193e2a81680044f4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Oct 2020 23:32:16 +0800 Subject: [PATCH] [td-1830] --- src/client/src/tscServer.c | 16 +++++++++++----- src/inc/taosmsg.h | 1 + src/mnode/src/mnodeShow.c | 28 ++++++++++++++++------------ src/util/src/tutil.c | 1 + 4 files changed, 29 insertions(+), 17 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0f89c537ec..316e3a323b 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -150,7 +150,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pObj == NULL) return; if (pObj != pObj->signature) { - tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature); + tscError("heartbeat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature); return; } @@ -175,12 +175,12 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); } } else { - tscDebug("heartbeat failed, code:%s", tstrerror(code)); + tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code)); } if (pObj->pHb != NULL) { int32_t waitingDuring = tsShellActivityTimer * 500; - tscDebug("%p start heartbeat in %dms", pSql, waitingDuring); + tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); } else { @@ -1639,11 +1639,14 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { pthread_mutex_unlock(&pObj->mutex); - tscError("%p failed to malloc for heartbeat msg", pSql); + tscError("%p failed to create heartbeat msg", pSql); return TSDB_CODE_TSC_OUT_OF_MEMORY; } + // TODO the expired hb and client can not be identified by server till now. SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload; + tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer)); + pHeartbeat->numOfQueries = numOfQueries; pHeartbeat->numOfStreams = numOfStreams; @@ -1996,10 +1999,11 @@ static void createHBObj(STscObj* pObj) { } int tscProcessConnectRsp(SSqlObj *pSql) { - char temp[TSDB_TABLE_FNAME_LEN * 2]; STscObj *pObj = pSql->pTscObj; SSqlRes *pRes = &pSql->res; + char temp[TSDB_TABLE_FNAME_LEN * 2] = {0}; + SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp; tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId)); // copy acctId from response int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db); @@ -2018,6 +2022,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) { pObj->connId = htonl(pConnect->connId); createHBObj(pObj); + + //launch a timer to send heartbeat to maintain the connection and send status to mnode taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); return 0; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 600347c44f..de692a558c 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -781,6 +781,7 @@ typedef struct { } SStreamDesc; typedef struct { + char clientVer[TSDB_VERSION_LEN]; uint32_t connId; int32_t pid; int32_t numOfQueries; diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 80909e99ae..655fe71259 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -232,12 +232,16 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { } static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { - SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp)); - if (pHBRsp == NULL) { + SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp)); + if (pRsp == NULL) { return TSDB_CODE_MND_OUT_OF_MEMORY; } SCMHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont; + if (taosCheckVersion(pHBMsg->clientVer, version, 3) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_VERSION; // todo change the error code + } + SRpcConnInfo connInfo = {0}; rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo); @@ -251,33 +255,33 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { if (pConn == NULL) { // do not close existing links, otherwise // mError("failed to create connId, close connect"); - // pHBRsp->killConnection = 1; + // pRsp->killConnection = 1; } else { - pHBRsp->connId = htonl(pConn->connId); + pRsp->connId = htonl(pConn->connId); mnodeSaveQueryStreamList(pConn, pHBMsg); if (pConn->killed != 0) { - pHBRsp->killConnection = 1; + pRsp->killConnection = 1; } if (pConn->streamId != 0) { - pHBRsp->streamId = htonl(pConn->streamId); + pRsp->streamId = htonl(pConn->streamId); pConn->streamId = 0; } if (pConn->queryId != 0) { - pHBRsp->queryId = htonl(pConn->queryId); + pRsp->queryId = htonl(pConn->queryId); pConn->queryId = 0; } } - pHBRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum()); - pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum()); - mnodeGetMnodeEpSetForShell(&pHBRsp->epSet); + pRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum()); + pRsp->totalDnodes = htonl(mnodeGetDnodesNum()); + mnodeGetMnodeEpSetForShell(&pRsp->epSet); - pMsg->rpcRsp.rsp = pHBRsp; + pMsg->rpcRsp.rsp = pRsp; pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp); - + mnodeReleaseConn(pConn); return TSDB_CODE_SUCCESS; } diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 6c4af437b2..099b9d9530 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -326,6 +326,7 @@ int32_t taosHexStrToByteArray(char hexstr[], char bytes[]) { return 0; } +// TODO move to comm module bool taosGetVersionNumber(char *versionStr, int *versionNubmer) { if (versionStr == NULL || versionNubmer == NULL) { return false; -- GitLab