提交 f47df5ba 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12670 from taosdata/fix/dnode

fix: only send message to one vnode in the vgroup
...@@ -38,9 +38,9 @@ static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) { ...@@ -38,9 +38,9 @@ static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) {
static inline void bmSendRsp(SRpcMsg *pMsg, int32_t code) { static inline void bmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = { SRpcMsg rsp = {
.code = code, .code = code,
.info = pMsg->info,
.pCont = pMsg->info.rsp, .pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen, .contLen = pMsg->info.rspLen,
.info = pMsg->info,
}; };
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
......
...@@ -116,8 +116,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) { ...@@ -116,8 +116,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
SServerStatusRsp statusRsp = {0}; SServerStatusRsp statusRsp = {0};
SMonMloadInfo minfo = {0}; SMonMloadInfo minfo = {0};
dmGetMnodeLoads(pMgmt, &minfo); dmGetMnodeLoads(pMgmt, &minfo);
if (minfo.isMnode && minfo.load.syncState != TAOS_SYNC_STATE_LEADER && if (minfo.isMnode && minfo.load.syncState == TAOS_SYNC_STATE_ERROR) {
minfo.load.syncState != TAOS_SYNC_STATE_CANDIDATE) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED; pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState)); snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
return; return;
...@@ -127,7 +126,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) { ...@@ -127,7 +126,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
dmGetVnodeLoads(pMgmt, &vinfo); dmGetVnodeLoads(pMgmt, &vinfo);
for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) { for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i); SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
if (pLoad->syncState != TAOS_SYNC_STATE_LEADER && pLoad->syncState != TAOS_SYNC_STATE_FOLLOWER) { if (pLoad->syncState == TAOS_SYNC_STATE_ERROR) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED; pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId, snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
syncStr(pLoad->syncState)); syncStr(pLoad->syncState));
......
...@@ -153,9 +153,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -153,9 +153,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rsp = { SRpcMsg rsp = {
.code = code, .code = code,
.info = pMsg->info,
.pCont = pMsg->info.rsp, .pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen, .contLen = pMsg->info.rspLen,
.info = pMsg->info,
}; };
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
......
...@@ -19,9 +19,9 @@ ...@@ -19,9 +19,9 @@
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) { static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = { SRpcMsg rsp = {
.code = code, .code = code,
.info = pMsg->info,
.pCont = pMsg->info.rsp, .pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen, .contLen = pMsg->info.rspLen,
.info = pMsg->info,
}; };
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
......
...@@ -19,9 +19,9 @@ ...@@ -19,9 +19,9 @@
static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) { static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = { SRpcMsg rsp = {
.code = code, .code = code,
.info = pMsg->info,
.pCont = pMsg->info.rsp, .pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen, .contLen = pMsg->info.rspLen,
.info = pMsg->info,
}; };
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
......
...@@ -433,7 +433,7 @@ void dmCloseProcRpcHandles(SProc *proc) { ...@@ -433,7 +433,7 @@ void dmCloseProcRpcHandles(SProc *proc) {
SRpcHandleInfo *pInfo = taosHashIterate(proc->hash, NULL); SRpcHandleInfo *pInfo = taosHashIterate(proc->hash, NULL);
while (pInfo != NULL) { while (pInfo != NULL) {
dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, pInfo->handle); dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, pInfo->handle);
SRpcMsg rpcMsg = {.info = *pInfo, .code = TSDB_CODE_NODE_OFFLINE}; SRpcMsg rpcMsg = {.code = TSDB_CODE_NODE_OFFLINE, .info = *pInfo};
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
pInfo = taosHashIterate(proc->hash, pInfo); pInfo = taosHashIterate(proc->hash, pInfo);
} }
......
...@@ -37,7 +37,7 @@ int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray ...@@ -37,7 +37,7 @@ int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -261,8 +261,7 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) { ...@@ -261,8 +261,7 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
sdbRelease(pSdb, pDb); sdbRelease(pSdb, pDb);
} }
static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
bool isRedo) {
STransAction action = {0}; STransAction action = {0};
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
...@@ -279,48 +278,29 @@ static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *p ...@@ -279,48 +278,29 @@ static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *p
action.msgType = TDMT_DND_CREATE_VNODE; action.msgType = TDMT_DND_CREATE_VNODE;
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED; action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
if (isRedo) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq);
taosMemoryFree(pReq); return -1;
return -1;
}
} else {
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
} }
return 0; return 0;
} }
static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
bool isRedo) {
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildAlterVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); void *pReq = mndBuildAlterVnodeReq(pMnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1; if (pReq == NULL) return -1;
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_VND_ALTER_VNODE; action.msgType = TDMT_VND_ALTER_VNODE;
if (isRedo) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq);
taosMemoryFree(pReq); return -1;
return -1;
}
} else {
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
} }
return 0; return 0;
...@@ -487,7 +467,7 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -487,7 +467,7 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = pVgroup->vnodeGid + vn; SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, true) != 0) { if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid) != 0) {
return -1; return -1;
} }
} }
...@@ -726,11 +706,8 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p ...@@ -726,11 +706,8 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) { static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
if (pVgroup->replica <= 0 || pVgroup->replica == pDb->cfg.replications) { if (pVgroup->replica <= 0 || pVgroup->replica == pDb->cfg.replications) {
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup) != 0) {
SVnodeGid *pVgid = pVgroup->vnodeGid + vn; return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, true) != 0) {
return -1;
}
} }
} else { } else {
SVgObj newVgroup = {0}; SVgObj newVgroup = {0};
...@@ -744,9 +721,9 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -744,9 +721,9 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
return -1; return -1;
} }
newVgroup.replica = pDb->cfg.replications; newVgroup.replica = pDb->cfg.replications;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[0], true) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1; if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1; if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1;
} else { } else {
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId); mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
...@@ -757,7 +734,7 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -757,7 +734,7 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
return -1; return -1;
} }
newVgroup.replica = pDb->cfg.replications; newVgroup.replica = pDb->cfg.replications;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[0], true) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1;
} }
......
...@@ -842,13 +842,12 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { ...@@ -842,13 +842,12 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
} }
taosMemoryFree(pTrans->rpcRsp); taosMemoryFree(pTrans->rpcRsp);
mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, code & 0xFFFF, pTrans->stage, mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, code, pTrans->stage, pTrans->rpcInfo.ahandle);
pTrans->rpcInfo.ahandle);
SRpcMsg rspMsg = { SRpcMsg rspMsg = {
.info = pTrans->rpcInfo,
.code = code, .code = code,
.pCont = rpcCont, .pCont = rpcCont,
.contLen = pTrans->rpcRspLen, .contLen = pTrans->rpcRspLen,
.info = pTrans->rpcInfo,
}; };
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
pTrans->rpcInfo.handle = NULL; pTrans->rpcInfo.handle = NULL;
...@@ -986,7 +985,8 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr ...@@ -986,7 +985,8 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
if (tmsgSendReq(&pAction->epSet, &rpcMsg) == 0) { if (tmsgSendReq(&pAction->epSet, &rpcMsg) == 0) {
mDebug("trans:%d, action:%d is sent", pTrans->id, action); mDebug("trans:%d, action:%d is sent to %s:%u", pTrans->id, action, pAction->epSet.eps[pAction->epSet.inUse].fqdn,
pAction->epSet.eps[pAction->epSet.inUse].port);
pAction->msgSent = 1; pAction->msgSent = 1;
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = 0; pAction->errCode = 0;
......
...@@ -256,7 +256,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg ...@@ -256,7 +256,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
return pReq; return pReq;
} }
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
SAlterVnodeReq alterReq = {0}; SAlterVnodeReq alterReq = {0};
alterReq.vgVersion = pVgroup->version; alterReq.vgVersion = pVgroup->version;
alterReq.buffer = pDb->cfg.buffer; alterReq.buffer = pDb->cfg.buffer;
...@@ -285,16 +285,14 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgO ...@@ -285,16 +285,14 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgO
pReplica->port = pVgidDnode->port; pReplica->port = pVgidDnode->port;
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
mndReleaseDnode(pMnode, pVgidDnode); mndReleaseDnode(pMnode, pVgidDnode);
if (pDnode->id == pVgid->dnodeId) {
alterReq.selfIndex = v;
}
} }
#if 0
if (alterReq.selfIndex == -1) { if (alterReq.selfIndex == -1) {
terrno = TSDB_CODE_MND_APP_ERROR; terrno = TSDB_CODE_MND_APP_ERROR;
return NULL; return NULL;
} }
#endif
int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq); int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq);
if (contLen < 0) { if (contLen < 0) {
......
...@@ -489,7 +489,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr ...@@ -489,7 +489,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
tstrncpy(desc.status, "ready", sizeof(desc.status)); tstrncpy(desc.status, "ready", sizeof(desc.status));
pClusterInfo->vgroups_alive++; pClusterInfo->vgroups_alive++;
} }
if (pVgid->role == TAOS_SYNC_STATE_LEADER || pVgid->role == TAOS_SYNC_STATE_CANDIDATE) { if (pVgid->role != TAOS_SYNC_STATE_ERROR) {
pClusterInfo->vnodes_alive++; pClusterInfo->vnodes_alive++;
} }
pClusterInfo->vnodes_total++; pClusterInfo->vnodes_total++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册