未验证 提交 59b172c9 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20589 from taosdata/feat/balance-vgroup-leader

feat: add sql command 'balance vgroup leader'
...@@ -1657,6 +1657,20 @@ typedef struct { ...@@ -1657,6 +1657,20 @@ typedef struct {
int32_t tSerializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq); int32_t tSerializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq);
int32_t tDeserializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq); int32_t tDeserializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq);
typedef struct {
int32_t useless;
} SBalanceVgroupLeaderReq;
int32_t tSerializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq);
int32_t tDeserializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq);
typedef struct {
int32_t vgId;
} SForceElectionReq;
int32_t tSerializeSForceElectionReq(void* buf, int32_t bufLen, SForceElectionReq* pReq);
int32_t tDeserializeSForceElectionReq(void* buf, int32_t bufLen, SForceElectionReq* pReq);
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
} SSplitVgroupReq; } SSplitVgroupReq;
......
...@@ -83,6 +83,7 @@ enum { ...@@ -83,6 +83,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_FORCE_ELECTION, "balance-force-election", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_NEW_MSG_SEG(TDMT_MND_MSG)
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
...@@ -165,6 +166,7 @@ enum { ...@@ -165,6 +166,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP, "balance-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP, "balance-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MERGE_VGROUP, "merge-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MERGE_VGROUP, "merge-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL)
......
...@@ -223,127 +223,128 @@ ...@@ -223,127 +223,128 @@
#define TK_TRANSACTION 205 #define TK_TRANSACTION 205
#define TK_BALANCE 206 #define TK_BALANCE 206
#define TK_VGROUP 207 #define TK_VGROUP 207
#define TK_MERGE 208 #define TK_LEADER 208
#define TK_REDISTRIBUTE 209 #define TK_MERGE 209
#define TK_SPLIT 210 #define TK_REDISTRIBUTE 210
#define TK_DELETE 211 #define TK_SPLIT 211
#define TK_INSERT 212 #define TK_DELETE 212
#define TK_NULL 213 #define TK_INSERT 213
#define TK_NK_QUESTION 214 #define TK_NULL 214
#define TK_NK_ARROW 215 #define TK_NK_QUESTION 215
#define TK_ROWTS 216 #define TK_NK_ARROW 216
#define TK_QSTART 217 #define TK_ROWTS 217
#define TK_QEND 218 #define TK_QSTART 218
#define TK_QDURATION 219 #define TK_QEND 219
#define TK_WSTART 220 #define TK_QDURATION 220
#define TK_WEND 221 #define TK_WSTART 221
#define TK_WDURATION 222 #define TK_WEND 222
#define TK_IROWTS 223 #define TK_WDURATION 223
#define TK_ISFILLED 224 #define TK_IROWTS 224
#define TK_CAST 225 #define TK_ISFILLED 225
#define TK_NOW 226 #define TK_CAST 226
#define TK_TODAY 227 #define TK_NOW 227
#define TK_TIMEZONE 228 #define TK_TODAY 228
#define TK_CLIENT_VERSION 229 #define TK_TIMEZONE 229
#define TK_SERVER_VERSION 230 #define TK_CLIENT_VERSION 230
#define TK_SERVER_STATUS 231 #define TK_SERVER_VERSION 231
#define TK_CURRENT_USER 232 #define TK_SERVER_STATUS 232
#define TK_CASE 233 #define TK_CURRENT_USER 233
#define TK_WHEN 234 #define TK_CASE 234
#define TK_THEN 235 #define TK_WHEN 235
#define TK_ELSE 236 #define TK_THEN 236
#define TK_BETWEEN 237 #define TK_ELSE 237
#define TK_IS 238 #define TK_BETWEEN 238
#define TK_NK_LT 239 #define TK_IS 239
#define TK_NK_GT 240 #define TK_NK_LT 240
#define TK_NK_LE 241 #define TK_NK_GT 241
#define TK_NK_GE 242 #define TK_NK_LE 242
#define TK_NK_NE 243 #define TK_NK_GE 243
#define TK_MATCH 244 #define TK_NK_NE 244
#define TK_NMATCH 245 #define TK_MATCH 245
#define TK_CONTAINS 246 #define TK_NMATCH 246
#define TK_IN 247 #define TK_CONTAINS 247
#define TK_JOIN 248 #define TK_IN 248
#define TK_INNER 249 #define TK_JOIN 249
#define TK_SELECT 250 #define TK_INNER 250
#define TK_DISTINCT 251 #define TK_SELECT 251
#define TK_WHERE 252 #define TK_DISTINCT 252
#define TK_PARTITION 253 #define TK_WHERE 253
#define TK_BY 254 #define TK_PARTITION 254
#define TK_SESSION 255 #define TK_BY 255
#define TK_STATE_WINDOW 256 #define TK_SESSION 256
#define TK_EVENT_WINDOW 257 #define TK_STATE_WINDOW 257
#define TK_SLIDING 258 #define TK_EVENT_WINDOW 258
#define TK_FILL 259 #define TK_SLIDING 259
#define TK_VALUE 260 #define TK_FILL 260
#define TK_VALUE_F 261 #define TK_VALUE 261
#define TK_NONE 262 #define TK_VALUE_F 262
#define TK_PREV 263 #define TK_NONE 263
#define TK_NULL_F 264 #define TK_PREV 264
#define TK_LINEAR 265 #define TK_NULL_F 265
#define TK_NEXT 266 #define TK_LINEAR 266
#define TK_HAVING 267 #define TK_NEXT 267
#define TK_RANGE 268 #define TK_HAVING 268
#define TK_EVERY 269 #define TK_RANGE 269
#define TK_ORDER 270 #define TK_EVERY 270
#define TK_SLIMIT 271 #define TK_ORDER 271
#define TK_SOFFSET 272 #define TK_SLIMIT 272
#define TK_LIMIT 273 #define TK_SOFFSET 273
#define TK_OFFSET 274 #define TK_LIMIT 274
#define TK_ASC 275 #define TK_OFFSET 275
#define TK_NULLS 276 #define TK_ASC 276
#define TK_ABORT 277 #define TK_NULLS 277
#define TK_AFTER 278 #define TK_ABORT 278
#define TK_ATTACH 279 #define TK_AFTER 279
#define TK_BEFORE 280 #define TK_ATTACH 280
#define TK_BEGIN 281 #define TK_BEFORE 281
#define TK_BITAND 282 #define TK_BEGIN 282
#define TK_BITNOT 283 #define TK_BITAND 283
#define TK_BITOR 284 #define TK_BITNOT 284
#define TK_BLOCKS 285 #define TK_BITOR 285
#define TK_CHANGE 286 #define TK_BLOCKS 286
#define TK_COMMA 287 #define TK_CHANGE 287
#define TK_CONCAT 288 #define TK_COMMA 288
#define TK_CONFLICT 289 #define TK_CONCAT 289
#define TK_COPY 290 #define TK_CONFLICT 290
#define TK_DEFERRED 291 #define TK_COPY 291
#define TK_DELIMITERS 292 #define TK_DEFERRED 292
#define TK_DETACH 293 #define TK_DELIMITERS 293
#define TK_DIVIDE 294 #define TK_DETACH 294
#define TK_DOT 295 #define TK_DIVIDE 295
#define TK_EACH 296 #define TK_DOT 296
#define TK_FAIL 297 #define TK_EACH 297
#define TK_FILE 298 #define TK_FAIL 298
#define TK_FOR 299 #define TK_FILE 299
#define TK_GLOB 300 #define TK_FOR 300
#define TK_ID 301 #define TK_GLOB 301
#define TK_IMMEDIATE 302 #define TK_ID 302
#define TK_IMPORT 303 #define TK_IMMEDIATE 303
#define TK_INITIALLY 304 #define TK_IMPORT 304
#define TK_INSTEAD 305 #define TK_INITIALLY 305
#define TK_ISNULL 306 #define TK_INSTEAD 306
#define TK_KEY 307 #define TK_ISNULL 307
#define TK_MODULES 308 #define TK_KEY 308
#define TK_NK_BITNOT 309 #define TK_MODULES 309
#define TK_NK_SEMI 310 #define TK_NK_BITNOT 310
#define TK_NOTNULL 311 #define TK_NK_SEMI 311
#define TK_OF 312 #define TK_NOTNULL 312
#define TK_PLUS 313 #define TK_OF 313
#define TK_PRIVILEGE 314 #define TK_PLUS 314
#define TK_RAISE 315 #define TK_PRIVILEGE 315
#define TK_REPLACE 316 #define TK_RAISE 316
#define TK_RESTRICT 317 #define TK_REPLACE 317
#define TK_ROW 318 #define TK_RESTRICT 318
#define TK_SEMI 319 #define TK_ROW 319
#define TK_STAR 320 #define TK_SEMI 320
#define TK_STATEMENT 321 #define TK_STAR 321
#define TK_STRICT 322 #define TK_STATEMENT 322
#define TK_STRING 323 #define TK_STRICT 323
#define TK_TIMES 324 #define TK_STRING 324
#define TK_VALUES 325 #define TK_TIMES 325
#define TK_VARIABLE 326 #define TK_VALUES 326
#define TK_VIEW 327 #define TK_VARIABLE 327
#define TK_WAL 328 #define TK_VIEW 328
#define TK_WAL 329
#define TK_NK_SPACE 600 #define TK_NK_SPACE 600
#define TK_NK_COMMENT 601 #define TK_NK_COMMENT 601
......
...@@ -466,6 +466,10 @@ typedef struct SBalanceVgroupStmt { ...@@ -466,6 +466,10 @@ typedef struct SBalanceVgroupStmt {
ENodeType type; ENodeType type;
} SBalanceVgroupStmt; } SBalanceVgroupStmt;
typedef struct SBalanceVgroupLeaderStmt {
ENodeType type;
} SBalanceVgroupLeaderStmt;
typedef struct SMergeVgroupStmt { typedef struct SMergeVgroupStmt {
ENodeType type; ENodeType type;
int32_t vgId1; int32_t vgId1;
......
...@@ -210,6 +210,7 @@ typedef enum ENodeType { ...@@ -210,6 +210,7 @@ typedef enum ENodeType {
QUERY_NODE_QUERY, QUERY_NODE_QUERY,
QUERY_NODE_SHOW_DB_ALIVE_STMT, QUERY_NODE_SHOW_DB_ALIVE_STMT,
QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT, QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT,
QUERY_NODE_BALANCE_VGROUP_LEADER_STMT,
// logic plan node // logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN = 1000, QUERY_NODE_LOGIC_PLAN_SCAN = 1000,
......
...@@ -245,6 +245,7 @@ bool syncIsReadyForRead(int64_t rid); ...@@ -245,6 +245,7 @@ bool syncIsReadyForRead(int64_t rid);
bool syncSnapshotSending(int64_t rid); bool syncSnapshotSending(int64_t rid);
bool syncSnapshotRecving(int64_t rid); bool syncSnapshotRecving(int64_t rid);
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq); int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
int32_t syncLeaderForceElection(int64_t rid);
SSyncState syncGetState(int64_t rid); SSyncState syncGetState(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
......
...@@ -4447,6 +4447,31 @@ int32_t tDeserializeSBalanceVgroupReq(void *buf, int32_t bufLen, SBalanceVgroupR ...@@ -4447,6 +4447,31 @@ int32_t tDeserializeSBalanceVgroupReq(void *buf, int32_t bufLen, SBalanceVgroupR
return 0; return 0;
} }
int32_t tSerializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceVgroupLeaderReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->useless) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceVgroupLeaderReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->useless) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSMergeVgroupReq(void *buf, int32_t bufLen, SMergeVgroupReq *pReq) { int32_t tSerializeSMergeVgroupReq(void *buf, int32_t bufLen, SMergeVgroupReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
...@@ -4530,6 +4555,31 @@ int32_t tDeserializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq * ...@@ -4530,6 +4555,31 @@ int32_t tDeserializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq *
return 0; return 0;
} }
int32_t tSerializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) { int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
......
...@@ -92,6 +92,7 @@ SArray *mmGetMsgHandles() { ...@@ -92,6 +92,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_FORCE_ELECTION_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
...@@ -126,6 +127,7 @@ SArray *mmGetMsgHandles() { ...@@ -126,6 +127,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SPLIT_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SPLIT_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP_LEADER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -86,6 +86,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemo ...@@ -86,6 +86,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemo
// vmHandle.c // vmHandle.c
SArray *vmGetMsgHandles(); SArray *vmGetMsgHandles();
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessForceElectionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
......
...@@ -304,6 +304,26 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -304,6 +304,26 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0; return 0;
} }
int32_t vmProcessForceElectionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg){
SForceElectionReq req = {0};
if (tDeserializeSForceElectionReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to alter hashrange since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
return -1;
}
vnodeForceElection(pVnode->pImpl);
return 0;
}
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SAlterVnodeHashRangeReq req = {0}; SAlterVnodeHashRangeReq req = {0};
if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) { if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
...@@ -548,6 +568,7 @@ SArray *vmGetMsgHandles() { ...@@ -548,6 +568,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_FORCE_ELECTION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
......
...@@ -37,6 +37,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -37,6 +37,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_DND_CREATE_VNODE: case TDMT_DND_CREATE_VNODE:
code = vmProcessCreateVnodeReq(pMgmt, pMsg); code = vmProcessCreateVnodeReq(pMgmt, pMsg);
break; break;
case TDMT_DND_FORCE_ELECTION:
code = vmProcessForceElectionReq(pMgmt, pMsg);
break;
case TDMT_DND_DROP_VNODE: case TDMT_DND_DROP_VNODE:
code = vmProcessDropVnodeReq(pMgmt, pMsg); code = vmProcessDropVnodeReq(pMgmt, pMsg);
break; break;
......
...@@ -77,7 +77,7 @@ void mndCleanupConsumer(SMnode *pMnode) {} ...@@ -77,7 +77,7 @@ void mndCleanupConsumer(SMnode *pMnode) {}
bool mndRebTryStart() { bool mndRebTryStart() {
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
mInfo("tq timer, rebalance counter old val:%d", old); mDebug("tq timer, rebalance counter old val:%d", old);
return old == 0; return old == 0;
} }
...@@ -101,7 +101,7 @@ void mndRebCntDec() { ...@@ -101,7 +101,7 @@ void mndRebCntDec() {
int32_t newVal = val - 1; int32_t newVal = val - 1;
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal); int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
if (oldVal == val) { if (oldVal == val) {
mInfo("rebalance trans end, rebalance counter:%d", newVal); mDebug("rebalance trans end, rebalance counter:%d", newVal);
break; break;
} }
} }
...@@ -356,7 +356,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -356,7 +356,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
} else { } else {
taosHashCleanup(pRebMsg->rebSubHash); taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg); rpcFreeCont(pRebMsg);
mInfo("mq rebalance finished, no modification"); mDebug("mq rebalance finished, no modification");
mndRebEnd(); mndRebEnd();
} }
return 0; return 0;
......
...@@ -40,6 +40,7 @@ static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter); ...@@ -40,6 +40,7 @@ static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq);
int32_t mndInitVgroup(SMnode *pMnode) { int32_t mndInitVgroup(SMnode *pMnode) {
SSdbTable table = { SSdbTable table = {
...@@ -60,10 +61,13 @@ int32_t mndInitVgroup(SMnode *pMnode) { ...@@ -60,10 +61,13 @@ int32_t mndInitVgroup(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_FORCE_ELECTION_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
//mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg);
mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
...@@ -1770,6 +1774,157 @@ _OVER: ...@@ -1770,6 +1774,157 @@ _OVER:
return code; return code;
} }
static void *mndBuildSForceElectionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId,
int32_t *pContLen) {
SForceElectionReq balanceReq = {
.vgId = pVgroup->vgId,
};
int32_t contLen = tSerializeSForceElectionReq(NULL, 0, &balanceReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSForceElectionReq((char *)pReq, contLen, &balanceReq);
*pContLen = contLen;
return pReq;
}
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
if (pDnode == NULL) return -1;
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildSForceElectionReq(pMnode, pVgroup, dnodeId, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_FORCE_ELECTION;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans){
SSdb *pSdb = pMnode->pSdb;
int32_t vgid = pVgroup->vgId;
int8_t replica = pVgroup->replica;
if(pVgroup->replica <= 1) {
mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
return -1;
}
int32_t index = vgid%replica;
int32_t dnodeId = pVgroup->vnodeGid[index].dnodeId;
bool exist = false;
bool online = false;
int64_t curMs = taosGetTimestampMs();
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
if (pDnode != NULL) {
exist = true;
online = mndIsDnodeOnline(pDnode, curMs);
mndReleaseDnode(pMnode, pDnode);
}
if(exist && online)
{
mInfo("trans:%d, vgid:%d leader to dnode:%d", pTrans->id, vgid, dnodeId);
if (mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId) != 0) {
mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
return -1;
}
SSdbRaw *pRaw = mndVgroupActionEncode(pVgroup);
if (pRaw == NULL) {
mError("trans:%d, vgid:%d failed to encode action to dnode:%d", pTrans->id, vgid, dnodeId);
return -1;
}
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
mError("trans:%d, vgid:%d failed to append commit log dnode:%d", pTrans->id, vgid, dnodeId);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
}
else
{
mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist, online);
}
return 0;
}
int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) {
int32_t code = -1;
SBalanceVgroupLeaderReq req = {0};
if (tDeserializeSBalanceVgroupLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return code;
}
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t total = sdbGetSize(pSdb, SDB_VGROUP);
if(total <= 0) {
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
return code;
}
STrans *pTrans = NULL;
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "bal-vg-leader");
if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to balance vgroup leader", pTrans->id);
void *pIter = NULL;
int32_t count = 0;
while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if(mndAddVgroupBalanceToTrans(pMnode, pVgroup, pTrans) == 0){
count++;
}
sdbRelease(pSdb, pVgroup);
}
if(count == 0) {
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
goto _OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:
mndTransDrop(pTrans);
return code;
}
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup, static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
SVgObj *pNewVgroup, SArray *pArray) { SVgObj *pNewVgroup, SArray *pArray) {
for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) { for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
......
...@@ -56,6 +56,7 @@ void vnodeDestroy(const char *path, STfs *pTfs); ...@@ -56,6 +56,7 @@ void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode); void vnodePreClose(SVnode *pVnode);
void vnodePostClose(SVnode *pVnode); void vnodePostClose(SVnode *pVnode);
void vnodeForceElection(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode *pVnode); void vnodeSyncCheckTimeout(SVnode *pVnode);
void vnodeClose(SVnode *pVnode); void vnodeClose(SVnode *pVnode);
int32_t vnodeSyncCommit(SVnode *pVnode); int32_t vnodeSyncCommit(SVnode *pVnode);
......
...@@ -380,6 +380,10 @@ void vnodePreClose(SVnode *pVnode) { ...@@ -380,6 +380,10 @@ void vnodePreClose(SVnode *pVnode) {
void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); } void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
void vnodeForceElection(SVnode *pVnode) {
syncLeaderForceElection(pVnode->sync);
}
void vnodeClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
tsem_wait(&pVnode->canCommit); tsem_wait(&pVnode->canCommit);
......
...@@ -173,6 +173,8 @@ const char* nodesNodeName(ENodeType type) { ...@@ -173,6 +173,8 @@ const char* nodesNodeName(ENodeType type) {
return "DropStreamStmt"; return "DropStreamStmt";
case QUERY_NODE_BALANCE_VGROUP_STMT: case QUERY_NODE_BALANCE_VGROUP_STMT:
return "BalanceVgroupStmt"; return "BalanceVgroupStmt";
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
return "BalanceVgroupLeaderStmt";
case QUERY_NODE_MERGE_VGROUP_STMT: case QUERY_NODE_MERGE_VGROUP_STMT:
return "MergeVgroupStmt"; return "MergeVgroupStmt";
case QUERY_NODE_SHOW_DB_ALIVE_STMT: case QUERY_NODE_SHOW_DB_ALIVE_STMT:
...@@ -6433,6 +6435,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { ...@@ -6433,6 +6435,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return dropStreamStmtToJson(pObj, pJson); return dropStreamStmtToJson(pObj, pJson);
case QUERY_NODE_BALANCE_VGROUP_STMT: case QUERY_NODE_BALANCE_VGROUP_STMT:
return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to serialize. return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to serialize.
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to serialize.
case QUERY_NODE_MERGE_VGROUP_STMT: case QUERY_NODE_MERGE_VGROUP_STMT:
return mergeVgroupStmtToJson(pObj, pJson); return mergeVgroupStmtToJson(pObj, pJson);
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
...@@ -6741,6 +6745,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { ...@@ -6741,6 +6745,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToDropStreamStmt(pJson, pObj); return jsonToDropStreamStmt(pJson, pObj);
case QUERY_NODE_BALANCE_VGROUP_STMT: case QUERY_NODE_BALANCE_VGROUP_STMT:
return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to deserialize. return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to deserialize.
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to deserialize.
case QUERY_NODE_MERGE_VGROUP_STMT: case QUERY_NODE_MERGE_VGROUP_STMT:
return jsonToMergeVgroupStmt(pJson, pObj); return jsonToMergeVgroupStmt(pJson, pObj);
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
......
...@@ -386,6 +386,8 @@ SNode* nodesMakeNode(ENodeType type) { ...@@ -386,6 +386,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SDropStreamStmt)); return makeNode(type, sizeof(SDropStreamStmt));
case QUERY_NODE_BALANCE_VGROUP_STMT: case QUERY_NODE_BALANCE_VGROUP_STMT:
return makeNode(type, sizeof(SBalanceVgroupStmt)); return makeNode(type, sizeof(SBalanceVgroupStmt));
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
return makeNode(type, sizeof(SBalanceVgroupLeaderStmt));
case QUERY_NODE_MERGE_VGROUP_STMT: case QUERY_NODE_MERGE_VGROUP_STMT:
return makeNode(type, sizeof(SMergeVgroupStmt)); return makeNode(type, sizeof(SMergeVgroupStmt));
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
...@@ -942,6 +944,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -942,6 +944,7 @@ void nodesDestroyNode(SNode* pNode) {
} }
case QUERY_NODE_DROP_STREAM_STMT: // no pointer field case QUERY_NODE_DROP_STREAM_STMT: // no pointer field
case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: // no pointer field
case QUERY_NODE_MERGE_VGROUP_STMT: // no pointer field case QUERY_NODE_MERGE_VGROUP_STMT: // no pointer field
break; break;
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
......
...@@ -223,6 +223,7 @@ SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToke ...@@ -223,6 +223,7 @@ SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToke
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId); SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId);
SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId); SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId);
SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt); SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt);
SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt);
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2); SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2);
SNode* createRedistributeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId, SNodeList* pDnodes); SNode* createRedistributeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId, SNodeList* pDnodes);
SNode* createSplitVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId); SNode* createSplitVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId);
......
...@@ -609,6 +609,7 @@ cmd ::= KILL TRANSACTION NK_INTEGER(A). ...@@ -609,6 +609,7 @@ cmd ::= KILL TRANSACTION NK_INTEGER(A).
/************************************************ merge/redistribute/ vgroup ******************************************/ /************************************************ merge/redistribute/ vgroup ******************************************/
cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); } cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); }
cmd ::= BALANCE VGROUP LEADER. { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt); }
cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); } cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); }
cmd ::= REDISTRIBUTE VGROUP NK_INTEGER(A) dnode_list(B). { pCxt->pRootNode = createRedistributeVgroupStmt(pCxt, &A, B); } cmd ::= REDISTRIBUTE VGROUP NK_INTEGER(A) dnode_list(B). { pCxt->pRootNode = createRedistributeVgroupStmt(pCxt, &A, B); }
cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); } cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); }
......
...@@ -1952,6 +1952,13 @@ SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt) { ...@@ -1952,6 +1952,13 @@ SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt) {
return (SNode*)pStmt; return (SNode*)pStmt;
} }
SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt) {
CHECK_PARSER_STATUS(pCxt);
SBalanceVgroupLeaderStmt* pStmt = (SBalanceVgroupLeaderStmt*)nodesMakeNode(QUERY_NODE_BALANCE_VGROUP_LEADER_STMT);
CHECK_OUT_OF_MEM(pStmt);
return (SNode*)pStmt;
}
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2) { SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
SMergeVgroupStmt* pStmt = (SMergeVgroupStmt*)nodesMakeNode(QUERY_NODE_MERGE_VGROUP_STMT); SMergeVgroupStmt* pStmt = (SMergeVgroupStmt*)nodesMakeNode(QUERY_NODE_MERGE_VGROUP_STMT);
......
...@@ -127,6 +127,7 @@ static SKeyword keywordTable[] = { ...@@ -127,6 +127,7 @@ static SKeyword keywordTable[] = {
{"LANGUAGE", TK_LANGUAGE}, {"LANGUAGE", TK_LANGUAGE},
{"LAST", TK_LAST}, {"LAST", TK_LAST},
{"LAST_ROW", TK_LAST_ROW}, {"LAST_ROW", TK_LAST_ROW},
{"LEADER", TK_LEADER},
{"LICENCES", TK_LICENCES}, {"LICENCES", TK_LICENCES},
{"LIKE", TK_LIKE}, {"LIKE", TK_LIKE},
{"LIMIT", TK_LIMIT}, {"LIMIT", TK_LIMIT},
......
...@@ -6469,6 +6469,11 @@ static int32_t translateBalanceVgroup(STranslateContext* pCxt, SBalanceVgroupStm ...@@ -6469,6 +6469,11 @@ static int32_t translateBalanceVgroup(STranslateContext* pCxt, SBalanceVgroupStm
return buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP, (FSerializeFunc)tSerializeSBalanceVgroupReq, &req); return buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP, (FSerializeFunc)tSerializeSBalanceVgroupReq, &req);
} }
static int32_t translateBalanceVgroupLeader(STranslateContext* pCxt, SBalanceVgroupLeaderStmt* pStmt) {
SBalanceVgroupLeaderReq req = {0};
return buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP_LEADER, (FSerializeFunc)tSerializeSBalanceVgroupLeaderReq, &req);
}
static int32_t translateMergeVgroup(STranslateContext* pCxt, SMergeVgroupStmt* pStmt) { static int32_t translateMergeVgroup(STranslateContext* pCxt, SMergeVgroupStmt* pStmt) {
SMergeVgroupReq req = {.vgId1 = pStmt->vgId1, .vgId2 = pStmt->vgId2}; SMergeVgroupReq req = {.vgId1 = pStmt->vgId1, .vgId2 = pStmt->vgId2};
return buildCmdMsg(pCxt, TDMT_MND_MERGE_VGROUP, (FSerializeFunc)tSerializeSMergeVgroupReq, &req); return buildCmdMsg(pCxt, TDMT_MND_MERGE_VGROUP, (FSerializeFunc)tSerializeSMergeVgroupReq, &req);
...@@ -6680,6 +6685,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { ...@@ -6680,6 +6685,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_BALANCE_VGROUP_STMT: case QUERY_NODE_BALANCE_VGROUP_STMT:
code = translateBalanceVgroup(pCxt, (SBalanceVgroupStmt*)pNode); code = translateBalanceVgroup(pCxt, (SBalanceVgroupStmt*)pNode);
break; break;
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
code = translateBalanceVgroupLeader(pCxt, (SBalanceVgroupLeaderStmt*)pNode);
break;
case QUERY_NODE_MERGE_VGROUP_STMT: case QUERY_NODE_MERGE_VGROUP_STMT:
code = translateMergeVgroup(pCxt, (SMergeVgroupStmt*)pNode); code = translateMergeVgroup(pCxt, (SMergeVgroupStmt*)pNode);
break; break;
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
...@@ -858,4 +858,21 @@ TEST_F(ParserInitialATest, balanceVgroup) { ...@@ -858,4 +858,21 @@ TEST_F(ParserInitialATest, balanceVgroup) {
run("BALANCE VGROUP"); run("BALANCE VGROUP");
} }
/*
* BALANCE VGROUP LEADER
*/
TEST_F(ParserInitialATest, balanceVgroupLeader) {
useDb("root", "test");
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_BALANCE_VGROUP_LEADER_STMT);
ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_BALANCE_VGROUP_LEADER);
SBalanceVgroupLeaderReq req = {0};
ASSERT_EQ(tDeserializeSBalanceVgroupLeaderReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req),
TSDB_CODE_SUCCESS);
});
run("BALANCE VGROUP LEADER");
}
} // namespace ParserTest } // namespace ParserTest
\ No newline at end of file
...@@ -228,6 +228,15 @@ int32_t syncLeaderTransfer(int64_t rid) { ...@@ -228,6 +228,15 @@ int32_t syncLeaderTransfer(int64_t rid) {
return ret; return ret;
} }
int32_t syncLeaderForceElection(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) return -1;
int32_t ret = syncNodeElect(pSyncNode);
syncNodeRelease(pSyncNode);
return ret;
}
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) { int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
SSyncNode* pNode = syncNodeAcquire(rid); SSyncNode* pNode = syncNodeAcquire(rid);
if (pNode == NULL) return -1; if (pNode == NULL) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册