diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 7208dc4d72269069a1e3aa5bcc6e8122954f09c1..b7e3c7199dade648fb3a4c45c8712e6ca8fb439a 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -434,6 +434,17 @@ static void tscProcessServStatus(SSqlObj *pSql) { if (pObj->pHb->res.code == TSDB_CODE_NETWORK_UNAVAIL) { pSql->res.code = TSDB_CODE_NETWORK_UNAVAIL; return; + } else { + int32_t* data = (int32_t*) pObj->pHb->res.data; + + int32_t totalDnode = data[0]; + int32_t onlineDnode = data[1]; + assert(onlineDnode <= totalDnode); + + if (onlineDnode < totalDnode) { + pSql->res.code = TSDB_CODE_NETWORK_UNAVAIL; + return; + } } } else { if (pSql->res.code == TSDB_CODE_NETWORK_UNAVAIL) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 08ea7f77ba7f31f53feb5c3ccf6dd950819d7b5b..ea42f32cf65c53ccecf748459b3ea0aa8f776bdf 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -106,12 +106,12 @@ static int32_t tscGetMgmtConnMaxRetryTimes() { return tscMgmtIpList.numOfIps * factor; } -void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { +int32_t tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { STscObj *pObj = (STscObj *)param; - if (pObj == NULL) return; + if (pObj == NULL) return TSDB_CODE_APP_ERROR; if (pObj != pObj->signature) { tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature); - return; + return TSDB_CODE_APP_ERROR; } SSqlObj *pSql = pObj->pHb; @@ -128,11 +128,19 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pRsp->queryId) tscKillQuery(pObj, pRsp->queryId); if (pRsp->streamId) tscKillStream(pObj, pRsp->streamId); } + + if (pRes->data == NULL) { + pRes->data = calloc(2, sizeof(int32_t)); + } + + ((int32_t*)pRes->data)[0] = htonl(pRsp->totalDnodes); + ((int32_t*)pRes->data)[1] = htonl(pRsp->onlineDnodes); } else { tscTrace("heart beat failed, code:%d", code); } taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); + return code; } void tscProcessActivityTimer(void *handle, void *tmrId) { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 3a843d111b4d496380a460f5bef156e571e45bf4..48ee48eae345a7a576b7aaa6eef1b2438a9ed39d 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -821,6 +821,8 @@ typedef struct { typedef struct { uint32_t queryId; uint32_t streamId; + uint32_t totalDnodes; + uint32_t onlineDnodes; char killConnection; SIpList ipList; } SHeartBeatRsp; diff --git a/src/system/detail/inc/mgmt.h b/src/system/detail/inc/mgmt.h index 3fb3522c859c9d2d4817b36e58a1e39d7ef5a2ef..424b65a0dd3463909105dcabedfa137d3c9838e1 100644 --- a/src/system/detail/inc/mgmt.h +++ b/src/system/detail/inc/mgmt.h @@ -432,6 +432,8 @@ bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode); void mgmtSetModuleInDnode(SDnodeObj *pDnode, int moduleType); int mgmtUnSetModuleInDnode(SDnodeObj *pDnode, int moduleType); +void mgmtGetDnodeOnlineNum(int32_t *totalDnodes, int32_t *onlineDnodes); + extern int (*mgmtGetMetaFp[])(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); extern int (*mgmtRetrieveFp[])(SShowObj *pShow, char *data, int rows, SConnObj *pConn); diff --git a/src/system/detail/src/mgmtShell.c b/src/system/detail/src/mgmtShell.c index af54ce9bf38c079a35a3e9566491a60654db6399..d99b729aa60d643510ec82bbe63d3e7053d485fc 100644 --- a/src/system/detail/src/mgmtShell.c +++ b/src/system/detail/src/mgmtShell.c @@ -1185,7 +1185,7 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) { char * pStart, *pMsg; int msgLen; STaosRsp *pRsp; - + mgmtSaveQueryStreamList(cont, contLen, pConn); pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_HEARTBEAT_RSP, 128); @@ -1203,6 +1203,10 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) { pConn->streamId = 0; pHBRsp->killConnection = pConn->killConnection; + mgmtGetDnodeOnlineNum(&pHBRsp->totalDnodes, &pHBRsp->onlineDnodes); + pHBRsp->totalDnodes = htonl(pHBRsp->totalDnodes); + pHBRsp->onlineDnodes = htonl(pHBRsp->onlineDnodes); + if (pConn->usePublicIp) { if (pSdbPublicIpList != NULL) { int size = pSdbPublicIpList->numOfIps * 4; diff --git a/src/system/lite/src/mgmtShell.spec.c b/src/system/lite/src/mgmtShell.spec.c index a1d8e6a34a4e2cc2d7df7c1acc6cdf75a796fc1a..a95ecd2ee2dc6b1816251a2d602432aa7df40b59 100644 --- a/src/system/lite/src/mgmtShell.spec.c +++ b/src/system/lite/src/mgmtShell.spec.c @@ -45,4 +45,9 @@ int mgmtProcessDropAcctMsg(char *pMsg, int msgLen, SConnObj *pConn) { int mgmtProcessCreateAcctMsg(char *pMsg, int msgLen, SConnObj *pConn) { return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT); +} + +void mgmtGetDnodeOnlineNum(int32_t *totalDnodes, int32_t *onlineDnodes) { + *totalDnodes = 1; + *onlineDnodes = 1; } \ No newline at end of file