未验证 提交 ed48713a 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20615 from taosdata/fix/trigger-election-by-sync-msg

Fix/trigger election by sync msg
...@@ -1670,10 +1670,10 @@ int32_t tDeserializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceV ...@@ -1670,10 +1670,10 @@ int32_t tDeserializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceV
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
} SForceElectionReq; } SForceBecomeFollowerReq;
int32_t tSerializeSForceElectionReq(void* buf, int32_t bufLen, SForceElectionReq* pReq); int32_t tSerializeSForceBecomeFollowerReq(void* buf, int32_t bufLen, SForceBecomeFollowerReq* pReq);
int32_t tDeserializeSForceElectionReq(void* buf, int32_t bufLen, SForceElectionReq* pReq); int32_t tDeserializeSForceBecomeFollowerReq(void* buf, int32_t bufLen, SForceBecomeFollowerReq* pReq);
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
......
...@@ -83,7 +83,6 @@ enum { ...@@ -83,7 +83,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_FORCE_ELECTION, "balance-force-election", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_NEW_MSG_SEG(TDMT_MND_MSG)
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
...@@ -166,7 +165,6 @@ enum { ...@@ -166,7 +165,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP, "balance-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP, "balance-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MERGE_VGROUP, "merge-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MERGE_VGROUP, "merge-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL)
...@@ -177,6 +175,7 @@ enum { ...@@ -177,6 +175,7 @@ enum {
// TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) // TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
// TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) // TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
...@@ -288,6 +287,7 @@ enum { ...@@ -288,6 +287,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT, "sync-pre-snapshot", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT, "sync-pre-snapshot", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
......
...@@ -246,7 +246,7 @@ bool syncIsReadyForRead(int64_t rid); ...@@ -246,7 +246,7 @@ bool syncIsReadyForRead(int64_t rid);
bool syncSnapshotSending(int64_t rid); bool syncSnapshotSending(int64_t rid);
bool syncSnapshotRecving(int64_t rid); bool syncSnapshotRecving(int64_t rid);
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq); int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
int32_t syncLeaderForceElection(int64_t rid); int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg);
SSyncState syncGetState(int64_t rid); SSyncState syncGetState(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
......
...@@ -4569,7 +4569,7 @@ int32_t tDeserializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq * ...@@ -4569,7 +4569,7 @@ int32_t tDeserializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq *
return 0; return 0;
} }
int32_t tSerializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq *pReq) { int32_t tSerializeSForceBecomeFollowerReq(void *buf, int32_t bufLen, SForceBecomeFollowerReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
...@@ -4582,7 +4582,7 @@ int32_t tSerializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq ...@@ -4582,7 +4582,7 @@ int32_t tSerializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq
return tlen; return tlen;
} }
int32_t tDeserializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq *pReq) { int32_t tDeserializeSForceBecomeFollowerReq(void *buf, int32_t bufLen, SForceBecomeFollowerReq *pReq) {
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
......
...@@ -92,7 +92,6 @@ SArray *mmGetMsgHandles() { ...@@ -92,7 +92,6 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_FORCE_ELECTION_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
...@@ -205,6 +204,7 @@ SArray *mmGetMsgHandles() { ...@@ -205,6 +204,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
code = 0; code = 0;
......
...@@ -86,7 +86,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemo ...@@ -86,7 +86,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemo
// vmHandle.c // vmHandle.c
SArray *vmGetMsgHandles(); SArray *vmGetMsgHandles();
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessForceElectionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
......
...@@ -304,26 +304,6 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -304,26 +304,6 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0; return 0;
} }
int32_t vmProcessForceElectionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg){
SForceElectionReq req = {0};
if (tDeserializeSForceElectionReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to alter hashrange since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
return -1;
}
vnodeForceElection(pVnode->pImpl);
return 0;
}
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SAlterVnodeHashRangeReq req = {0}; SAlterVnodeHashRangeReq req = {0};
if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) { if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
...@@ -568,7 +548,6 @@ SArray *vmGetMsgHandles() { ...@@ -568,7 +548,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_FORCE_ELECTION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
...@@ -586,6 +565,7 @@ SArray *vmGetMsgHandles() { ...@@ -586,6 +565,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
code = 0; code = 0;
......
...@@ -37,9 +37,6 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -37,9 +37,6 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_DND_CREATE_VNODE: case TDMT_DND_CREATE_VNODE:
code = vmProcessCreateVnodeReq(pMgmt, pMsg); code = vmProcessCreateVnodeReq(pMgmt, pMsg);
break; break;
case TDMT_DND_FORCE_ELECTION:
code = vmProcessForceElectionReq(pMgmt, pMsg);
break;
case TDMT_DND_DROP_VNODE: case TDMT_DND_DROP_VNODE:
code = vmProcessDropVnodeReq(pMgmt, pMsg); code = vmProcessDropVnodeReq(pMgmt, pMsg);
break; break;
......
...@@ -61,7 +61,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { ...@@ -61,7 +61,7 @@ int32_t mndInitVgroup(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_FORCE_ELECTION_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
...@@ -1779,17 +1779,18 @@ _OVER: ...@@ -1779,17 +1779,18 @@ _OVER:
return code; return code;
} }
static void *mndBuildSForceElectionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId,
int32_t *pContLen) { int32_t *pContLen) {
SForceElectionReq balanceReq = { SForceBecomeFollowerReq balanceReq = {
.vgId = pVgroup->vgId, .vgId = pVgroup->vgId,
}; };
int32_t contLen = tSerializeSForceElectionReq(NULL, 0, &balanceReq); int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
if (contLen < 0) { if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
contLen += sizeof(SMsgHead);
void *pReq = taosMemoryMalloc(contLen); void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) { if (pReq == NULL) {
...@@ -1797,9 +1798,13 @@ static void *mndBuildSForceElectionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t ...@@ -1797,9 +1798,13 @@ static void *mndBuildSForceElectionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t
return NULL; return NULL;
} }
tSerializeSForceElectionReq((char *)pReq, contLen, &balanceReq); SMsgHead *pHead = pReq;
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq);
*pContLen = contLen; *pContLen = contLen;
return pReq; return pReq;
} }
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) { int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
...@@ -1811,12 +1816,12 @@ int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj * ...@@ -1811,12 +1816,12 @@ int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildSForceElectionReq(pMnode, pVgroup, dnodeId, &contLen); void *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
if (pReq == NULL) return -1; if (pReq == NULL) return -1;
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_DND_FORCE_ELECTION; action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
...@@ -1832,13 +1837,20 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra ...@@ -1832,13 +1837,20 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra
int32_t vgid = pVgroup->vgId; int32_t vgid = pVgroup->vgId;
int8_t replica = pVgroup->replica; int8_t replica = pVgroup->replica;
if(pVgroup->replica <= 1) { if(pVgroup->replica <= 1) {
mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica); mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
return -1; return -1;
} }
int32_t index = vgid%replica; int32_t dnodeId = pVgroup->vnodeGid[0].dnodeId;
int32_t dnodeId = pVgroup->vnodeGid[index].dnodeId;
for(int i = 0; i < replica; i++)
{
if(pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER){
dnodeId = pVgroup->vnodeGid[i].dnodeId;
break;
}
}
bool exist = false; bool exist = false;
bool online = false; bool online = false;
......
...@@ -56,7 +56,6 @@ void vnodeDestroy(const char *path, STfs *pTfs); ...@@ -56,7 +56,6 @@ void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode); void vnodePreClose(SVnode *pVnode);
void vnodePostClose(SVnode *pVnode); void vnodePostClose(SVnode *pVnode);
void vnodeForceElection(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode *pVnode); void vnodeSyncCheckTimeout(SVnode *pVnode);
void vnodeClose(SVnode *pVnode); void vnodeClose(SVnode *pVnode);
int32_t vnodeSyncCommit(SVnode *pVnode); int32_t vnodeSyncCommit(SVnode *pVnode);
......
...@@ -380,10 +380,6 @@ void vnodePreClose(SVnode *pVnode) { ...@@ -380,10 +380,6 @@ void vnodePreClose(SVnode *pVnode) {
void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); } void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
void vnodeForceElection(SVnode *pVnode) {
syncLeaderForceElection(pVnode->sync);
}
void vnodeClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
tsem_wait(&pVnode->canCommit); tsem_wait(&pVnode->canCommit);
......
...@@ -207,6 +207,9 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { ...@@ -207,6 +207,9 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
case TDMT_SYNC_LOCAL_CMD: case TDMT_SYNC_LOCAL_CMD:
code = syncNodeOnLocalCmd(pSyncNode, pMsg); code = syncNodeOnLocalCmd(pSyncNode, pMsg);
break; break;
case TDMT_SYNC_FORCE_FOLLOWER:
code = syncForceBecomeFollower(pSyncNode, pMsg);
break;
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1; code = -1;
...@@ -229,13 +232,18 @@ int32_t syncLeaderTransfer(int64_t rid) { ...@@ -229,13 +232,18 @@ int32_t syncLeaderTransfer(int64_t rid) {
return ret; return ret;
} }
int32_t syncLeaderForceElection(int64_t rid) { int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); syncNodeBecomeFollower(ths, "force election");
if (pSyncNode == NULL) return -1;
int32_t ret = syncNodeElect(pSyncNode); SRpcMsg rsp = {
syncNodeRelease(pSyncNode); .code = 0,
return ret; .pCont = pRpcMsg->info.rsp,
.contLen = pRpcMsg->info.rspLen,
.info = pRpcMsg->info,
};
tmsgSendRsp(&rsp);
return 0;
} }
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) { int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册