From ea35f0ae11e80f59f4b7f3f47db5b9ee4e4daa76 Mon Sep 17 00:00:00 2001 From: cadem Date: Wed, 22 Mar 2023 09:36:59 +0800 Subject: [PATCH] feat/balance vgroup leader --- include/common/tmsg.h | 14 ++ include/common/tmsgdef.h | 2 + include/common/ttokendef.h | 1 + include/libs/nodes/cmdnodes.h | 4 + include/libs/nodes/nodes.h | 1 + include/libs/sync/sync.h | 1 + source/common/src/tmsg.c | 50 +++++++ source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 2 + source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 21 +++ source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 3 + source/dnode/mnode/impl/src/mndConsumer.c | 6 +- source/dnode/mnode/impl/src/mndVgroup.c | 155 ++++++++++++++++++++ source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/vnd/vnodeOpen.c | 4 + source/libs/nodes/src/nodesCodeFuncs.c | 6 + source/libs/nodes/src/nodesUtilFuncs.c | 3 + source/libs/parser/inc/parAst.h | 1 + source/libs/parser/src/parAstCreater.c | 7 + source/libs/parser/src/parTokenizer.c | 1 + source/libs/parser/src/parTranslater.c | 8 + source/libs/parser/src/sql.c | 9 +- source/libs/sync/src/syncMain.c | 9 ++ 23 files changed, 306 insertions(+), 4 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d9d3c7e297..ad19b20622 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1655,6 +1655,20 @@ typedef struct { int32_t tSerializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq); int32_t tDeserializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq); +typedef struct { + int32_t useless; +} SBalanceVgroupLeaderReq; + +int32_t tSerializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq); +int32_t tDeserializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq); + +typedef struct { + int32_t vgId; +} SForceElectionReq; + +int32_t tSerializeSForceElectionReq(void* buf, int32_t bufLen, SForceElectionReq* pReq); +int32_t tDeserializeSForceElectionReq(void* buf, int32_t bufLen, SForceElectionReq* pReq); + typedef struct { int32_t vgId; } SSplitVgroupReq; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 46ca814e50..67fd832f13 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -83,6 +83,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_DND_FORCE_ELECTION, "balance-force-election", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) @@ -165,6 +166,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP, "balance-vgroup", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MERGE_VGROUP, "merge-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL) diff --git a/include/common/ttokendef.h b/include/common/ttokendef.h index 1235dd3447..9c21de1f8d 100644 --- a/include/common/ttokendef.h +++ b/include/common/ttokendef.h @@ -344,6 +344,7 @@ #define TK_VARIABLE 326 #define TK_VIEW 327 #define TK_WAL 328 +#define TK_VGROUP_LEADER 329 #define TK_NK_SPACE 600 #define TK_NK_COMMENT 601 diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index fefcc28bc2..c716d77b32 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -466,6 +466,10 @@ typedef struct SBalanceVgroupStmt { ENodeType type; } SBalanceVgroupStmt; +typedef struct SBalanceVgroupLeaderStmt { + ENodeType type; +} SBalanceVgroupLeaderStmt; + typedef struct SMergeVgroupStmt { ENodeType type; int32_t vgId1; diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index a2e4c76adc..0a92aa8c85 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -161,6 +161,7 @@ typedef enum ENodeType { QUERY_NODE_CREATE_STREAM_STMT, QUERY_NODE_DROP_STREAM_STMT, QUERY_NODE_BALANCE_VGROUP_STMT, + QUERY_NODE_BALANCE_VGROUP_LEADER_STMT, QUERY_NODE_MERGE_VGROUP_STMT, QUERY_NODE_REDISTRIBUTE_VGROUP_STMT, QUERY_NODE_SPLIT_VGROUP_STMT, diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 189484d1a6..8f3c686ffd 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -245,6 +245,7 @@ bool syncIsReadyForRead(int64_t rid); bool syncSnapshotSending(int64_t rid); bool syncSnapshotRecving(int64_t rid); int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq); +int32_t syncLeaderForceElection(int64_t rid); SSyncState syncGetState(int64_t rid); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9ad7c72bc0..4225e632df 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4438,6 +4438,31 @@ int32_t tDeserializeSBalanceVgroupReq(void *buf, int32_t bufLen, SBalanceVgroupR return 0; } +int32_t tSerializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceVgroupLeaderReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->useless) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceVgroupLeaderReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->useless) < 0) return -1; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + int32_t tSerializeSMergeVgroupReq(void *buf, int32_t bufLen, SMergeVgroupReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -4521,6 +4546,31 @@ int32_t tDeserializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq * return 0; } +int32_t tSerializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index fe65c3dde9..0fa889169e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -92,6 +92,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_FORCE_ELECTION_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -126,6 +127,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SPLIT_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP_LEADER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index e3fa2964b7..d10565975b 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -86,6 +86,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemo // vmHandle.c SArray *vmGetMsgHandles(); int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessForceElectionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d7f91b74a8..92429a4157 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -304,6 +304,26 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } +int32_t vmProcessForceElectionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg){ + SForceElectionReq req = {0}; + + if (tDeserializeSForceElectionReq(pMsg->pCont, pMsg->contLen, &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); + if (pVnode == NULL) { + dError("vgId:%d, failed to alter hashrange since %s", req.vgId, terrstr()); + terrno = TSDB_CODE_VND_NOT_EXIST; + return -1; + } + + vnodeForceElection(pVnode->pImpl); + + return 0; +} + int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SAlterVnodeHashRangeReq req = {0}; if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) { @@ -548,6 +568,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_FORCE_ELECTION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 7aa1c9f56a..e0f141639e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -37,6 +37,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { case TDMT_DND_CREATE_VNODE: code = vmProcessCreateVnodeReq(pMgmt, pMsg); break; + case TDMT_DND_FORCE_ELECTION: + code = vmProcessForceElectionReq(pMgmt, pMsg); + break; case TDMT_DND_DROP_VNODE: code = vmProcessDropVnodeReq(pMgmt, pMsg); break; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index f1ef83aca5..49676ace11 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -77,7 +77,7 @@ void mndCleanupConsumer(SMnode *pMnode) {} bool mndRebTryStart() { int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); - mInfo("tq timer, rebalance counter old val:%d", old); + mDebug("tq timer, rebalance counter old val:%d", old); return old == 0; } @@ -101,7 +101,7 @@ void mndRebCntDec() { int32_t newVal = val - 1; int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal); if (oldVal == val) { - mInfo("rebalance trans end, rebalance counter:%d", newVal); + mDebug("rebalance trans end, rebalance counter:%d", newVal); break; } } @@ -356,7 +356,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { } else { taosHashCleanup(pRebMsg->rebSubHash); rpcFreeCont(pRebMsg); - mInfo("mq rebalance finished, no modification"); + mDebug("mq rebalance finished, no modification"); mndRebEnd(); } return 0; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 84e8a9ec43..2315bf89c6 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -40,6 +40,7 @@ static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter); static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq); +static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq); int32_t mndInitVgroup(SMnode *pMnode) { SSdbTable table = { @@ -60,10 +61,13 @@ int32_t mndInitVgroup(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_DND_FORCE_ELECTION_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg); + //mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg); mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg); + mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup); @@ -1770,6 +1774,157 @@ _OVER: return code; } +static void *mndBuildSForceElectionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, + int32_t *pContLen) { + SForceElectionReq balanceReq = { + .vgId = pVgroup->vgId, + }; + + int32_t contLen = tSerializeSForceElectionReq(NULL, 0, &balanceReq); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + void *pReq = taosMemoryMalloc(contLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + tSerializeSForceElectionReq((char *)pReq, contLen, &balanceReq); + *pContLen = contLen; + return pReq; +} + +int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) { + SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId); + if (pDnode == NULL) return -1; + + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + int32_t contLen = 0; + void *pReq = mndBuildSForceElectionReq(pMnode, pVgroup, dnodeId, &contLen); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_DND_FORCE_ELECTION; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + +int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans){ + SSdb *pSdb = pMnode->pSdb; + + int32_t vgid = pVgroup->vgId; + int8_t replica = pVgroup->replica; + + if(pVgroup->replica <= 1) { + mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica); + return -1; + } + + int32_t index = vgid%replica; + int32_t dnodeId = pVgroup->vnodeGid[index].dnodeId; + + bool exist = false; + bool online = false; + int64_t curMs = taosGetTimestampMs(); + SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId); + if (pDnode != NULL) { + exist = true; + online = mndIsDnodeOnline(pDnode, curMs); + mndReleaseDnode(pMnode, pDnode); + } + + if(exist && online) + { + mInfo("trans:%d, vgid:%d leader to dnode:%d", pTrans->id, vgid, dnodeId); + + if (mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId) != 0) { + mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId); + return -1; + } + + SSdbRaw *pRaw = mndVgroupActionEncode(pVgroup); + if (pRaw == NULL) { + mError("trans:%d, vgid:%d failed to encode action to dnode:%d", pTrans->id, vgid, dnodeId); + return -1; + } + if (mndTransAppendCommitlog(pTrans, pRaw) != 0) { + sdbFreeRaw(pRaw); + mError("trans:%d, vgid:%d failed to append commit log dnode:%d", pTrans->id, vgid, dnodeId); + return -1; + } + (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); + } + else + { + mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist, online); + } + + return 0; +} + +int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) { + int32_t code = -1; + + SBalanceVgroupLeaderReq req = {0}; + if (tDeserializeSBalanceVgroupLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return code; + } + + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + + int32_t total = sdbGetSize(pSdb, SDB_VGROUP); + if(total <= 0) { + terrno = TSDB_CODE_TSC_INVALID_OPERATION; + return code; + } + + STrans *pTrans = NULL; + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "bal-vg-leader"); + if (pTrans == NULL) goto _OVER; + mndTransSetSerial(pTrans); + mInfo("trans:%d, used to balance vgroup leader", pTrans->id); + + void *pIter = NULL; + int32_t count = 0; + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if(mndAddVgroupBalanceToTrans(pMnode, pVgroup, pTrans) == 0){ + count++; + } + + sdbRelease(pSdb, pVgroup); + } + + if(count == 0) { + terrno = TSDB_CODE_TSC_INVALID_OPERATION; + goto _OVER; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + code = 0; + +_OVER: + mndTransDrop(pTrans); + return code; +} + static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup, SVgObj *pNewVgroup, SArray *pArray) { for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 1d14829891..91e1bcde4e 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -56,6 +56,7 @@ void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); void vnodePostClose(SVnode *pVnode); +void vnodeForceElection(SVnode *pVnode); void vnodeSyncCheckTimeout(SVnode *pVnode); void vnodeClose(SVnode *pVnode); int32_t vnodeSyncCommit(SVnode *pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index c7d155be0d..c5f9412461 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -380,6 +380,10 @@ void vnodePreClose(SVnode *pVnode) { void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); } +void vnodeForceElection(SVnode *pVnode) { + syncLeaderForceElection(pVnode->sync); +} + void vnodeClose(SVnode *pVnode) { if (pVnode) { tsem_wait(&pVnode->canCommit); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index e18de1c1d2..0aeb83ce08 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -173,6 +173,8 @@ const char* nodesNodeName(ENodeType type) { return "DropStreamStmt"; case QUERY_NODE_BALANCE_VGROUP_STMT: return "BalanceVgroupStmt"; + case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: + return "BalanceVgroupLeaderStmt"; case QUERY_NODE_MERGE_VGROUP_STMT: return "MergeVgroupStmt"; case QUERY_NODE_SHOW_DB_ALIVE_STMT: @@ -6433,6 +6435,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return dropStreamStmtToJson(pObj, pJson); case QUERY_NODE_BALANCE_VGROUP_STMT: return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to serialize. + case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: + return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to serialize. case QUERY_NODE_MERGE_VGROUP_STMT: return mergeVgroupStmtToJson(pObj, pJson); case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: @@ -6741,6 +6745,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToDropStreamStmt(pJson, pObj); case QUERY_NODE_BALANCE_VGROUP_STMT: return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to deserialize. + case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: + return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to deserialize. case QUERY_NODE_MERGE_VGROUP_STMT: return jsonToMergeVgroupStmt(pJson, pObj); case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 8d6c3288b9..f6bdd19caa 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -386,6 +386,8 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SDropStreamStmt)); case QUERY_NODE_BALANCE_VGROUP_STMT: return makeNode(type, sizeof(SBalanceVgroupStmt)); + case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: + return makeNode(type, sizeof(SBalanceVgroupLeaderStmt)); case QUERY_NODE_MERGE_VGROUP_STMT: return makeNode(type, sizeof(SMergeVgroupStmt)); case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: @@ -937,6 +939,7 @@ void nodesDestroyNode(SNode* pNode) { } case QUERY_NODE_DROP_STREAM_STMT: // no pointer field case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field + case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: // no pointer field case QUERY_NODE_MERGE_VGROUP_STMT: // no pointer field break; case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index 10d4adf17d..fb5ac12970 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -223,6 +223,7 @@ SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToke SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId); SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId); SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt); +SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt); SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2); SNode* createRedistributeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId, SNodeList* pDnodes); SNode* createSplitVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId); diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index a761b7a7b0..1c6f3f1b6c 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -1945,6 +1945,13 @@ SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt) { return (SNode*)pStmt; } +SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt) { + CHECK_PARSER_STATUS(pCxt); + SBalanceVgroupLeaderStmt* pStmt = (SBalanceVgroupLeaderStmt*)nodesMakeNode(QUERY_NODE_BALANCE_VGROUP_LEADER_STMT); + CHECK_OUT_OF_MEM(pStmt); + return (SNode*)pStmt; +} + SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2) { CHECK_PARSER_STATUS(pCxt); SMergeVgroupStmt* pStmt = (SMergeVgroupStmt*)nodesMakeNode(QUERY_NODE_MERGE_VGROUP_STMT); diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 366831ee1a..d618c3fd6b 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -250,6 +250,7 @@ static SKeyword keywordTable[] = { {"VERBOSE", TK_VERBOSE}, {"VGROUP", TK_VGROUP}, {"VGROUPS", TK_VGROUPS}, + {"VGROUP_LEADER", TK_VGROUP_LEADER}, {"VNODES", TK_VNODES}, {"WAL_FSYNC_PERIOD", TK_WAL_FSYNC_PERIOD}, {"WAL_LEVEL", TK_WAL_LEVEL}, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 74b32c638d..fd631a6d49 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6458,6 +6458,11 @@ static int32_t translateBalanceVgroup(STranslateContext* pCxt, SBalanceVgroupStm return buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP, (FSerializeFunc)tSerializeSBalanceVgroupReq, &req); } +static int32_t translateBalanceVgroupLeader(STranslateContext* pCxt, SBalanceVgroupLeaderStmt* pStmt) { + SBalanceVgroupLeaderReq req = {0}; + return buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP_LEADER, (FSerializeFunc)tSerializeSBalanceVgroupLeaderReq, &req); +} + static int32_t translateMergeVgroup(STranslateContext* pCxt, SMergeVgroupStmt* pStmt) { SMergeVgroupReq req = {.vgId1 = pStmt->vgId1, .vgId2 = pStmt->vgId2}; return buildCmdMsg(pCxt, TDMT_MND_MERGE_VGROUP, (FSerializeFunc)tSerializeSMergeVgroupReq, &req); @@ -6669,6 +6674,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { case QUERY_NODE_BALANCE_VGROUP_STMT: code = translateBalanceVgroup(pCxt, (SBalanceVgroupStmt*)pNode); break; + case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: + code = translateBalanceVgroupLeader(pCxt, (SBalanceVgroupLeaderStmt*)pNode); + break; case QUERY_NODE_MERGE_VGROUP_STMT: code = translateMergeVgroup(pCxt, (SMergeVgroupStmt*)pNode); break; diff --git a/source/libs/parser/src/sql.c b/source/libs/parser/src/sql.c index 457083358d..8319bd38ee 100644 --- a/source/libs/parser/src/sql.c +++ b/source/libs/parser/src/sql.c @@ -1210,6 +1210,7 @@ static const YYCODETYPE yyFallback[] = { 0, /* TRANSACTION => nothing */ 0, /* BALANCE => nothing */ 0, /* VGROUP => nothing */ + 0, /* VGROUP_LEADER => nothing */ 0, /* MERGE => nothing */ 0, /* REDISTRIBUTE => nothing */ 0, /* SPLIT => nothing */ @@ -1891,6 +1892,7 @@ static const char *const yyTokenName[] = { /* 470 */ "sort_specification", /* 471 */ "ordering_specification_opt", /* 472 */ "null_ordering_opt", + /* 473 */ "VGROUP_LEADER", }; #endif /* defined(YYCOVERAGE) || !defined(NDEBUG) */ @@ -2472,6 +2474,7 @@ static const char *const yyRuleName[] = { /* 571 */ "null_ordering_opt ::=", /* 572 */ "null_ordering_opt ::= NULLS FIRST", /* 573 */ "null_ordering_opt ::= NULLS LAST", + /* 574 */ "cmd ::= BALANCE VGROUP_LEADER", }; #endif /* NDEBUG */ @@ -3666,6 +3669,7 @@ static const struct { { 472, 0 }, /* (571) null_ordering_opt ::= */ { 472, -2 }, /* (572) null_ordering_opt ::= NULLS FIRST */ { 472, -2 }, /* (573) null_ordering_opt ::= NULLS LAST */ + { 329, -2 }, /* (574) cmd ::= BALANCE VGROUP_LEADER */ }; static void yy_accept(yyParser*); /* Forward Declaration */ @@ -4791,7 +4795,7 @@ static YYACTIONTYPE yy_reduce( { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_TRANSACTION_STMT, &yymsp[0].minor.yy0); } break; case 337: /* cmd ::= BALANCE VGROUP */ -{ pCxt->pRootNode = createBalanceVgroupStmt(pCxt); } +{ pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt); } break; case 338: /* cmd ::= MERGE VGROUP NK_INTEGER NK_INTEGER */ { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &yymsp[-1].minor.yy0, &yymsp[0].minor.yy0); } @@ -5364,6 +5368,9 @@ static YYACTIONTYPE yy_reduce( case 573: /* null_ordering_opt ::= NULLS LAST */ { yymsp[-1].minor.yy675 = NULL_ORDER_LAST; } break; + case 574: /* cmd ::= BALANCE VGROUP_LEADER */ +{ pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt); } + break; default: break; /********** End reduce actions ************************************************/ diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 90f79fd93c..c3b1046bdf 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -228,6 +228,15 @@ int32_t syncLeaderTransfer(int64_t rid) { return ret; } +int32_t syncLeaderForceElection(int64_t rid) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode == NULL) return -1; + + int32_t ret = syncNodeElect(pSyncNode); + syncNodeRelease(pSyncNode); + return ret; +} + int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) { SSyncNode* pNode = syncNodeAcquire(rid); if (pNode == NULL) return -1; -- GitLab