未验证 提交 ee782d05 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2003 from taosdata/hotfix/heartbeat

Hotfix/heartbeat
...@@ -434,6 +434,17 @@ static void tscProcessServStatus(SSqlObj *pSql) { ...@@ -434,6 +434,17 @@ static void tscProcessServStatus(SSqlObj *pSql) {
if (pObj->pHb->res.code == TSDB_CODE_NETWORK_UNAVAIL) { if (pObj->pHb->res.code == TSDB_CODE_NETWORK_UNAVAIL) {
pSql->res.code = TSDB_CODE_NETWORK_UNAVAIL; pSql->res.code = TSDB_CODE_NETWORK_UNAVAIL;
return; return;
} else {
int32_t* data = (int32_t*) pObj->pHb->res.data;
int32_t totalDnode = data[0];
int32_t onlineDnode = data[1];
assert(onlineDnode <= totalDnode);
if (onlineDnode < totalDnode) {
pSql->res.code = TSDB_CODE_NETWORK_UNAVAIL;
return;
}
} }
} else { } else {
if (pSql->res.code == TSDB_CODE_NETWORK_UNAVAIL) { if (pSql->res.code == TSDB_CODE_NETWORK_UNAVAIL) {
......
...@@ -106,12 +106,12 @@ static int32_t tscGetMgmtConnMaxRetryTimes() { ...@@ -106,12 +106,12 @@ static int32_t tscGetMgmtConnMaxRetryTimes() {
return tscMgmtIpList.numOfIps * factor; return tscMgmtIpList.numOfIps * factor;
} }
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { int32_t tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param; STscObj *pObj = (STscObj *)param;
if (pObj == NULL) return; if (pObj == NULL) return TSDB_CODE_APP_ERROR;
if (pObj != pObj->signature) { if (pObj != pObj->signature) {
tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature); tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
return; return TSDB_CODE_APP_ERROR;
} }
SSqlObj *pSql = pObj->pHb; SSqlObj *pSql = pObj->pHb;
...@@ -128,11 +128,19 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -128,11 +128,19 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (pRsp->queryId) tscKillQuery(pObj, pRsp->queryId); if (pRsp->queryId) tscKillQuery(pObj, pRsp->queryId);
if (pRsp->streamId) tscKillStream(pObj, pRsp->streamId); if (pRsp->streamId) tscKillStream(pObj, pRsp->streamId);
} }
if (pRes->data == NULL) {
pRes->data = calloc(2, sizeof(int32_t));
}
((int32_t*)pRes->data)[0] = htonl(pRsp->totalDnodes);
((int32_t*)pRes->data)[1] = htonl(pRsp->onlineDnodes);
} else { } else {
tscTrace("heart beat failed, code:%d", code); tscTrace("heart beat failed, code:%d", code);
} }
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
return code;
} }
void tscProcessActivityTimer(void *handle, void *tmrId) { void tscProcessActivityTimer(void *handle, void *tmrId) {
......
...@@ -821,6 +821,8 @@ typedef struct { ...@@ -821,6 +821,8 @@ typedef struct {
typedef struct { typedef struct {
uint32_t queryId; uint32_t queryId;
uint32_t streamId; uint32_t streamId;
uint32_t totalDnodes;
uint32_t onlineDnodes;
char killConnection; char killConnection;
SIpList ipList; SIpList ipList;
} SHeartBeatRsp; } SHeartBeatRsp;
......
...@@ -432,6 +432,8 @@ bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode); ...@@ -432,6 +432,8 @@ bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode);
void mgmtSetModuleInDnode(SDnodeObj *pDnode, int moduleType); void mgmtSetModuleInDnode(SDnodeObj *pDnode, int moduleType);
int mgmtUnSetModuleInDnode(SDnodeObj *pDnode, int moduleType); int mgmtUnSetModuleInDnode(SDnodeObj *pDnode, int moduleType);
void mgmtGetDnodeOnlineNum(int32_t *totalDnodes, int32_t *onlineDnodes);
extern int (*mgmtGetMetaFp[])(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); extern int (*mgmtGetMetaFp[])(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
extern int (*mgmtRetrieveFp[])(SShowObj *pShow, char *data, int rows, SConnObj *pConn); extern int (*mgmtRetrieveFp[])(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
......
...@@ -1203,6 +1203,10 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) { ...@@ -1203,6 +1203,10 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) {
pConn->streamId = 0; pConn->streamId = 0;
pHBRsp->killConnection = pConn->killConnection; pHBRsp->killConnection = pConn->killConnection;
mgmtGetDnodeOnlineNum(&pHBRsp->totalDnodes, &pHBRsp->onlineDnodes);
pHBRsp->totalDnodes = htonl(pHBRsp->totalDnodes);
pHBRsp->onlineDnodes = htonl(pHBRsp->onlineDnodes);
if (pConn->usePublicIp) { if (pConn->usePublicIp) {
if (pSdbPublicIpList != NULL) { if (pSdbPublicIpList != NULL) {
int size = pSdbPublicIpList->numOfIps * 4; int size = pSdbPublicIpList->numOfIps * 4;
......
...@@ -46,3 +46,8 @@ int mgmtProcessDropAcctMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -46,3 +46,8 @@ int mgmtProcessDropAcctMsg(char *pMsg, int msgLen, SConnObj *pConn) {
int mgmtProcessCreateAcctMsg(char *pMsg, int msgLen, SConnObj *pConn) { int mgmtProcessCreateAcctMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT); return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT);
} }
void mgmtGetDnodeOnlineNum(int32_t *totalDnodes, int32_t *onlineDnodes) {
*totalDnodes = 1;
*onlineDnodes = 1;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册