提交 893abf43 编写于 作者: S Shengliang Guan

fix: taosShellNetChk.py failed if set numOfRpcThreads to 1

上级 eebd3f0f
...@@ -47,6 +47,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt); ...@@ -47,6 +47,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt);
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
// dmMonitor.c // dmMonitor.c
void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo); void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo);
......
...@@ -120,6 +120,59 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -120,6 +120,59 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return TSDB_CODE_OPS_NOT_SUPPORT; return TSDB_CODE_OPS_NOT_SUPPORT;
} }
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
pStatus->details[0] = 0;
SServerStatusRsp statusRsp = {0};
SMonMloadInfo minfo = {0};
dmGetMnodeLoads(pMgmt, &minfo);
if (minfo.isMnode && minfo.load.syncState != TAOS_SYNC_STATE_LEADER &&
minfo.load.syncState != TAOS_SYNC_STATE_CANDIDATE) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
return;
}
SMonVloadInfo vinfo = {0};
dmGetVnodeLoads(pMgmt, &vinfo);
for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
if (pLoad->syncState != TAOS_SYNC_STATE_LEADER && pLoad->syncState != TAOS_SYNC_STATE_FOLLOWER) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
syncStr(pLoad->syncState));
break;
}
}
taosArrayDestroy(vinfo.pVloads);
}
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
dDebug("server run status req is received");
SServerStatusRsp statusRsp = {0};
dmGetServerRunStatus(pMgmt, &statusRsp);
SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .refId = pMsg->rpcMsg.refId};
int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
if (rspLen < 0) {
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
pMsg->pRsp = pRsp;
pMsg->rspLen = rspLen;
return 0;
}
SArray *dmGetMsgHandles() { SArray *dmGetMsgHandles() {
int32_t code = -1; int32_t code = -1;
SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle)); SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
...@@ -135,6 +188,7 @@ SArray *dmGetMsgHandles() { ...@@ -135,6 +188,7 @@ SArray *dmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
// Requests handled by MNODE // Requests handled by MNODE
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
......
...@@ -100,10 +100,10 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) { ...@@ -100,10 +100,10 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SDnodeMgmt *pMgmt = pInfo->ahandle; SDnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
int32_t code = -1; tmsg_t msgType = pMsg->rpcMsg.msgType;
tmsg_t msgType = pMsg->rpcMsg.msgType; bool isRequest = msgType & 1u;
dTrace("msg:%p, will be processed in dnode-mgmt queue", pMsg); dTrace("msg:%p, will be processed in dnode-mgmt queue, type:%s", pMsg, TMSG_INFO(msgType));
switch (msgType) { switch (msgType) {
case TDMT_DND_CONFIG_DNODE: case TDMT_DND_CONFIG_DNODE:
...@@ -139,17 +139,23 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -139,17 +139,23 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
case TDMT_DND_DROP_BNODE: case TDMT_DND_DROP_BNODE:
code = (*pMgmt->processDropNodeFp)(pMgmt->pDnode, BNODE, pMsg); code = (*pMgmt->processDropNodeFp)(pMgmt->pDnode, BNODE, pMsg);
break; break;
case TDMT_DND_SERVER_STATUS:
code = dmProcessServerRunStatus(pMgmt, pMsg);
break;
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
break; break;
} }
if (msgType & 1u) { if (isRequest) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rsp = { SRpcMsg rsp = {
.handle = pMsg->rpcMsg.handle, .handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle, .ahandle = pMsg->rpcMsg.ahandle,
.code = code, .code = code,
.refId = pMsg->rpcMsg.refId, .refId = pMsg->rpcMsg.refId,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen,
}; };
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
......
...@@ -104,7 +104,7 @@ void dmSetEvent(SDnode *pDnode, EDndEvent event); ...@@ -104,7 +104,7 @@ void dmSetEvent(SDnode *pDnode, EDndEvent event);
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc); void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc);
void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pMsg); void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg);
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg); void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg);
int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
......
...@@ -235,44 +235,18 @@ void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const c ...@@ -235,44 +235,18 @@ void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const c
dmReportStartup(pWrapper->pDnode, pName, pDesc); dmReportStartup(pWrapper->pDnode, pName, pDesc);
} }
static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) { static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
SDnodeMgmt *pMgmt = pDnode->wrappers[DNODE].pMgmt; SDnodeMgmt *pMgmt = pDnode->wrappers[DNODE].pMgmt;
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
pStatus->details[0] = 0; pStatus->details[0] = 0;
if (pDnode->status == DND_STAT_INIT) { if (pDnode->status == DND_STAT_INIT) {
pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK; pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK;
snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc); snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc);
return; } else if (pDnode->status == DND_STAT_STOPPED) {
}
if (pDnode->status == DND_STAT_STOPPED) {
pStatus->statusCode = TSDB_SRV_STATUS_EXTING; pStatus->statusCode = TSDB_SRV_STATUS_EXTING;
return; } else {
} pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
SMonMloadInfo minfo = {0};
dmGetMnodeLoads(pMgmt, &minfo);
if (minfo.isMnode && minfo.load.syncState != TAOS_SYNC_STATE_LEADER &&
minfo.load.syncState != TAOS_SYNC_STATE_CANDIDATE) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
return;
}
SMonVloadInfo vinfo = {0};
dmGetVnodeLoads(pMgmt, &vinfo);
for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
if (pLoad->syncState != TAOS_SYNC_STATE_LEADER && pLoad->syncState != TAOS_SYNC_STATE_FOLLOWER) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
syncStr(pLoad->syncState));
break;
}
} }
taosArrayDestroy(vinfo.pVloads);
} }
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) { void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) {
...@@ -288,11 +262,11 @@ void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -288,11 +262,11 @@ void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) {
rpcFreeCont(pReq->pCont); rpcFreeCont(pReq->pCont);
} }
void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) { void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("server status req is received"); dDebug("server startup status req is received");
SServerStatusRsp statusRsp = {0}; SServerStatusRsp statusRsp = {0};
dmGetServerStatus(pDnode, &statusRsp); dmGetServerStartupStatus(pDnode, &statusRsp);
SRpcMsg rspMsg = {.handle = pReq->handle, .ahandle = pReq->ahandle, .refId = pReq->refId}; SRpcMsg rspMsg = {.handle = pReq->handle, .ahandle = pReq->ahandle, .refId = pReq->refId};
int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp); int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
......
...@@ -132,9 +132,13 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -132,9 +132,13 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
switch (msgType) { switch (msgType) {
case TDMT_DND_SERVER_STATUS: case TDMT_DND_SERVER_STATUS:
dTrace("server status req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle); if (pDnode->status != DND_STAT_RUNNING) {
dmProcessServerStatusReq(pDnode, pMsg); dTrace("server status req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
return; dmProcessServerStartupStatus(pDnode, pMsg);
return;
} else {
break;
}
case TDMT_DND_NET_TEST: case TDMT_DND_NET_TEST:
dTrace("net test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle); dTrace("net test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
dmProcessNetTestReq(pDnode, pMsg); dmProcessNetTestReq(pDnode, pMsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册