未验证 提交 f590a362 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #13687 from taosdata/fix/dnode

feat: redistribute vgroup to dnodes
...@@ -85,7 +85,6 @@ int32_t* taosGetErrno(); ...@@ -85,7 +85,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_NETWORK_UNAVAIL TAOS_DEF_ERROR_CODE(0, 0x0102) #define TSDB_CODE_RPC_NETWORK_UNAVAIL TAOS_DEF_ERROR_CODE(0, 0x0102)
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0103) #define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0103)
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0104) #define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0104)
#define TSDB_CODE_RPC_INDIRECT_NETWORK_UNAVAIL TAOS_DEF_ERROR_CODE(0, 0x0105)
//client //client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
...@@ -220,6 +219,7 @@ int32_t* taosGetErrno(); ...@@ -220,6 +219,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0392) #define TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0392)
#define TSDB_CODE_MND_VGROUP_UN_CHANGED TAOS_DEF_ERROR_CODE(0, 0x0393) #define TSDB_CODE_MND_VGROUP_UN_CHANGED TAOS_DEF_ERROR_CODE(0, 0x0393)
#define TSDB_CODE_MND_HAS_OFFLINE_DNODE TAOS_DEF_ERROR_CODE(0, 0x0394) #define TSDB_CODE_MND_HAS_OFFLINE_DNODE TAOS_DEF_ERROR_CODE(0, 0x0394)
#define TSDB_CODE_MND_INVALID_REPLICA TAOS_DEF_ERROR_CODE(0, 0x0395)
// mnode-stable // mnode-stable
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0) #define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0)
...@@ -260,6 +260,7 @@ int32_t* taosGetErrno(); ...@@ -260,6 +260,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TRANS_CONFLICT TAOS_DEF_ERROR_CODE(0, 0x03D3) #define TSDB_CODE_MND_TRANS_CONFLICT TAOS_DEF_ERROR_CODE(0, 0x03D3)
#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03D4) #define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03D4)
#define TSDB_CODE_MND_TRANS_CLOG_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x03D5) #define TSDB_CODE_MND_TRANS_CLOG_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x03D5)
#define TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL TAOS_DEF_ERROR_CODE(0, 0x03D6)
// mnode-mq // mnode-mq
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0) #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)
......
...@@ -221,7 +221,7 @@ static const SSysDbTableSchema transSchema[] = { ...@@ -221,7 +221,7 @@ static const SSysDbTableSchema transSchema[] = {
{.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "last_error", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "last_action_info", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
}; };
static const SSysDbTableSchema configSchema[] = { static const SSysDbTableSchema configSchema[] = {
......
...@@ -170,6 +170,9 @@ SArray *mmGetMsgHandles() { ...@@ -170,6 +170,9 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_REDISTRIBUTE_VGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -120,10 +120,10 @@ typedef struct { ...@@ -120,10 +120,10 @@ typedef struct {
SArray* commitActions; SArray* commitActions;
int64_t createdTime; int64_t createdTime;
int64_t lastExecTime; int64_t lastExecTime;
int32_t lastErrorAction; int32_t lastAction;
int32_t lastErrorNo; int32_t lastErrorNo;
tmsg_t lastErrorMsgType; tmsg_t lastMsgType;
SEpSet lastErrorEpset; SEpSet lastEpset;
char dbname[TSDB_DB_FNAME_LEN]; char dbname[TSDB_DB_FNAME_LEN];
int32_t startFunc; int32_t startFunc;
int32_t stopFunc; int32_t stopFunc;
......
...@@ -217,8 +217,8 @@ static int32_t mndInitSteps(SMnode *pMnode) { ...@@ -217,8 +217,8 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1; if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1; if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1; if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-qnode", mndInitSnode, mndCleanupSnode) != 0) return -1; if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-qnode", mndInitBnode, mndCleanupBnode) != 0) return -1; if (mndAllocStep(pMnode, "mnode-bnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1; if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1; if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1; if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
......
...@@ -781,7 +781,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { ...@@ -781,7 +781,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
sendRsp = true; sendRsp = true;
} }
} else { } else {
if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 3) { if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 2) {
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
sendRsp = true; sendRsp = true;
} }
...@@ -791,7 +791,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { ...@@ -791,7 +791,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage), mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage),
pTrans->rpcInfo.ahandle); pTrans->rpcInfo.ahandle);
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
code = TSDB_CODE_RPC_INDIRECT_NETWORK_UNAVAIL; code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL;
} }
SRpcMsg rspMsg = {.code = code, .info = pTrans->rpcInfo}; SRpcMsg rspMsg = {.code = code, .info = pTrans->rpcInfo};
...@@ -894,10 +894,19 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi ...@@ -894,10 +894,19 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
code = 0; code = 0;
mDebug("trans:%d, %s:%d write to sdb, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage), pAction->id, mDebug("trans:%d, %s:%d write to sdb, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage), pAction->id,
sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status)); sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
pTrans->lastAction = pAction->id;
pTrans->lastMsgType = pAction->msgType;
pTrans->lastEpset = pAction->epSet;
pTrans->lastErrorNo = 0;
} else { } else {
pAction->errCode = (terrno != 0) ? terrno : code; pAction->errCode = (terrno != 0) ? terrno : code;
mError("trans:%d, %s:%d failed to write sdb since %s, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage), mError("trans:%d, %s:%d failed to write sdb since %s, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage),
pAction->id, terrstr(), sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status)); pAction->id, terrstr(), sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
pTrans->lastAction = pAction->id;
pTrans->lastMsgType = pAction->msgType;
pTrans->lastEpset = pAction->epSet;
pTrans->lastErrorNo = pAction->errCode;
} }
return code; return code;
...@@ -933,27 +942,48 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio ...@@ -933,27 +942,48 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = 0; pAction->errCode = 0;
mDebug("trans:%d, %s:%d is sent, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, detail); mDebug("trans:%d, %s:%d is sent, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, detail);
pTrans->lastAction = pAction->id;
pTrans->lastMsgType = pAction->msgType;
pTrans->lastEpset = pAction->epSet;
if (pTrans->lastErrorNo == 0) {
pTrans->lastErrorNo = TSDB_CODE_ACTION_IN_PROGRESS;
}
} else { } else {
pAction->msgSent = 0; pAction->msgSent = 0;
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = (terrno != 0) ? terrno : code; pAction->errCode = (terrno != 0) ? terrno : code;
mError("trans:%d, %s:%d not send since %s, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, terrstr(), mError("trans:%d, %s:%d not send since %s, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, terrstr(),
detail); detail);
pTrans->lastAction = pAction->id;
pTrans->lastMsgType = pAction->msgType;
pTrans->lastEpset = pAction->epSet;
pTrans->lastErrorNo = pAction->errCode;
} }
return code; return code;
} }
static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
pAction->rawWritten = 0;
pAction->errCode = 0;
mDebug("trans:%d, %s:%d null action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
pTrans->lastAction = pAction->id;
pTrans->lastMsgType = pAction->msgType;
pTrans->lastEpset = pAction->epSet;
pTrans->lastErrorNo == 0;
return 0;
}
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
if (pAction->actionType == TRANS_ACTION_RAW) { if (pAction->actionType == TRANS_ACTION_RAW) {
return mndTransWriteSingleLog(pMnode, pTrans, pAction); return mndTransWriteSingleLog(pMnode, pTrans, pAction);
} else if (pAction->actionType == TRANS_ACTION_MSG) { } else if (pAction->actionType == TRANS_ACTION_MSG) {
return mndTransSendSingleMsg(pMnode, pTrans, pAction); return mndTransSendSingleMsg(pMnode, pTrans, pAction);
} else { } else {
pAction->rawWritten = 0; return mndTransExecNullMsg(pMnode, pTrans, pAction);
pAction->errCode = 0;
mDebug("trans:%d, %s:%d null action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
return 0;
} }
} }
...@@ -994,19 +1024,19 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA ...@@ -994,19 +1024,19 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
if (numOfExecuted == numOfActions) { if (numOfExecuted == numOfActions) {
if (errCode == 0) { if (errCode == 0) {
pTrans->lastErrorAction = 0; pTrans->lastAction = 0;
pTrans->lastErrorNo = 0; pTrans->lastErrorNo = 0;
pTrans->lastErrorMsgType = 0; pTrans->lastMsgType = 0;
memset(&pTrans->lastErrorEpset, 0, sizeof(pTrans->lastErrorEpset)); memset(&pTrans->lastEpset, 0, sizeof(pTrans->lastEpset));
mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions); mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
return 0; return 0;
} else { } else {
mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode & 0XFFFF); mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode & 0XFFFF);
if (pErrAction != NULL) { if (pErrAction != NULL) {
pTrans->lastErrorMsgType = pErrAction->msgType; pTrans->lastMsgType = pErrAction->msgType;
pTrans->lastErrorAction = pErrAction->id; pTrans->lastAction = pErrAction->id;
pTrans->lastErrorNo = pErrAction->errCode; pTrans->lastErrorNo = pErrAction->errCode;
pTrans->lastErrorEpset = pErrAction->epSet; pTrans->lastEpset = pErrAction->epSet;
} }
mndTransResetActions(pMnode, pTrans, pArray); mndTransResetActions(pMnode, pTrans, pArray);
terrno = errCode; terrno = errCode;
...@@ -1073,15 +1103,15 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) ...@@ -1073,15 +1103,15 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
} }
if (code == 0) { if (code == 0) {
pTrans->lastErrorAction = 0; pTrans->lastAction = 0;
pTrans->lastErrorNo = 0; pTrans->lastErrorNo = 0;
pTrans->lastErrorMsgType = 0; pTrans->lastMsgType = 0;
memset(&pTrans->lastErrorEpset, 0, sizeof(pTrans->lastErrorEpset)); memset(&pTrans->lastEpset, 0, sizeof(pTrans->lastEpset));
} else { } else {
pTrans->lastErrorMsgType = pAction->msgType; pTrans->lastMsgType = pAction->msgType;
pTrans->lastErrorAction = action; pTrans->lastAction = action;
pTrans->lastErrorNo = pAction->errCode; pTrans->lastErrorNo = code;
pTrans->lastErrorEpset = pAction->epSet; pTrans->lastEpset = pAction->epSet;
} }
if (code == 0) { if (code == 0) {
...@@ -1432,23 +1462,21 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl ...@@ -1432,23 +1462,21 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false); colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false);
char lastError[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; char lastInfo[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
char detail[TSDB_TRANS_ERROR_LEN] = {0}; char detail[TSDB_TRANS_ERROR_LEN] = {0};
if (pTrans->lastErrorNo != 0) { int32_t len = snprintf(detail, sizeof(detail), "action:%d code:0x%x(%s) ", pTrans->lastAction,
int32_t len = snprintf(detail, sizeof(detail), "action:%d errno:0x%x(%s) ", pTrans->lastErrorAction, pTrans->lastErrorNo & 0xFFFF, tstrerror(pTrans->lastErrorNo));
pTrans->lastErrorNo & 0xFFFF, tstrerror(pTrans->lastErrorNo)); SEpSet epset = pTrans->lastEpset;
SEpSet epset = pTrans->lastErrorEpset; if (epset.numOfEps > 0) {
if (epset.numOfEps > 0) { len += snprintf(detail + len, sizeof(detail) - len, "msgType:%s numOfEps:%d inUse:%d ",
len += snprintf(detail + len, sizeof(detail) - len, "msgType:%s numOfEps:%d inUse:%d ", TMSG_INFO(pTrans->lastMsgType), epset.numOfEps, epset.inUse);
TMSG_INFO(pTrans->lastErrorMsgType), epset.numOfEps, epset.inUse); for (int32_t i = 0; i < pTrans->lastEpset.numOfEps; ++i) {
for (int32_t i = 0; i < pTrans->lastErrorEpset.numOfEps; ++i) { len += snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port);
len += snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port);
}
} }
} }
STR_WITH_MAXSIZE_TO_VARSTR(lastError, detail, pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(lastInfo, detail, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)lastError, false); colDataAppend(pColInfo, numOfRows, (const char *)lastInfo, false);
numOfRows++; numOfRows++;
sdbRelease(pSdb, pTrans); sdbRelease(pSdb, pTrans);
......
...@@ -59,6 +59,10 @@ int32_t mndInitVgroup(SMnode *pMnode) { ...@@ -59,6 +59,10 @@ int32_t mndInitVgroup(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
mndSetMsgHandle(pMnode, TDMT_MND_MERGE_VGROUP, mndProcessSplitVgroupMsg);
mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
...@@ -1009,10 +1013,10 @@ static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, S ...@@ -1009,10 +1013,10 @@ static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, S
if (pGid == NULL) return 0; if (pGid == NULL) return 0;
pVgroup->replica--;
memcpy(&delGid, pGid, sizeof(SVnodeGid)); memcpy(&delGid, pGid, sizeof(SVnodeGid));
memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid)); memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid)); memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
pVgroup->replica--;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
...@@ -1040,11 +1044,36 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, ...@@ -1040,11 +1044,36 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId); mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
} }
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id) != 0) goto _OVER; if (pNew1 != pOld1) {
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id) != 0) goto _OVER; int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew1->id);
if (pNew2 != NULL) { if (numOfVnodes >= pNew1->numOfSupportVnodes) {
mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew1->id, numOfVnodes,
pNew1->numOfSupportVnodes);
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
goto _OVER;
}
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id) != 0) goto _OVER;
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id) != 0) goto _OVER;
}
if (pNew2 != pOld2) {
int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew2->id);
if (numOfVnodes >= pNew2->numOfSupportVnodes) {
mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew2->id, numOfVnodes,
pNew2->numOfSupportVnodes);
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
goto _OVER;
}
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id) != 0) goto _OVER; if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id) != 0) goto _OVER;
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id) != 0) goto _OVER; if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id) != 0) goto _OVER;
}
if (pNew3 != pOld3) {
int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew3->id);
if (numOfVnodes >= pNew3->numOfSupportVnodes) {
mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew3->id, numOfVnodes,
pNew3->numOfSupportVnodes);
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
goto _OVER;
}
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id) != 0) goto _OVER; if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id) != 0) goto _OVER;
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id) != 0) goto _OVER; if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id) != 0) goto _OVER;
} }
...@@ -1070,88 +1099,105 @@ _OVER: ...@@ -1070,88 +1099,105 @@ _OVER:
} }
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) { static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SUserObj *pUser = NULL; SUserObj *pUser = NULL;
SDnodeObj *pNew1 = NULL; SDnodeObj *pNew1 = NULL;
SDnodeObj *pNew2 = NULL; SDnodeObj *pNew2 = NULL;
SDnodeObj *pNew3 = NULL; SDnodeObj *pNew3 = NULL;
SDnodeObj *pOld1 = NULL; SDnodeObj *pOld1 = NULL;
SDnodeObj *pOld2 = NULL; SDnodeObj *pOld2 = NULL;
SDnodeObj *pOld3 = NULL; SDnodeObj *pOld3 = NULL;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
int32_t code = -1; int32_t code = -1;
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
SMDropMnodeReq redReq = {0};
SRedistributeVgroupReq redReq = {0};
#if 0 if (tDeserializeSRedistributeVgroupReq(pReq->pCont, pReq->contLen, &redReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto _OVER; goto _OVER;
} }
#endif
mDebug("vgId:%d, start to redistribute", 2); mInfo("vgId:%d, start to redistribute to dnode %d:%d:%d", redReq.vgId, redReq.dnodeId1, redReq.dnodeId2,
redReq.dnodeId3);
pUser = mndAcquireUser(pMnode, pReq->conn.user); pUser = mndAcquireUser(pMnode, pReq->conn.user);
if (pUser == NULL) { if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto _OVER; goto _OVER;
} }
if (mndCheckNodeAuth(pUser) != 0) { if (mndCheckNodeAuth(pUser) != 0) goto _OVER;
goto _OVER;
}
pVgroup = mndAcquireVgroup(pMnode, 2); pVgroup = mndAcquireVgroup(pMnode, redReq.vgId);
if (pVgroup == NULL) goto _OVER; if (pVgroup == NULL) goto _OVER;
pDb = mndAcquireDb(pMnode, pVgroup->dbName); pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb == NULL) goto _OVER; if (pDb == NULL) goto _OVER;
if (pVgroup->replica == 1) { if (pVgroup->replica == 1) {
pNew1 = mndAcquireDnode(pMnode, 1); if (redReq.dnodeId2 != -1 || redReq.dnodeId3 != -1) {
terrno = TSDB_CODE_MND_INVALID_REPLICA;
goto _OVER;
}
pNew1 = mndAcquireDnode(pMnode, redReq.dnodeId1);
pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId); pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
if (pNew1 == NULL || pOld1 == NULL) goto _OVER; if (pNew1 == NULL || pOld1 == NULL) {
if (!mndIsDnodeOnline(pNew1, curMs) || !mndIsDnodeOnline(pOld1, curMs)) { terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
terrno = TSDB_CODE_NODE_OFFLINE;
goto _OVER; goto _OVER;
} }
if (pNew1 == pOld1) { if (pNew1 == pOld1) {
terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED; terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
goto _OVER; goto _OVER;
} }
if (mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL) != 0) goto _OVER; if (!mndIsDnodeOnline(pNew1, curMs) || !mndIsDnodeOnline(pOld1, curMs)) {
} terrno = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
goto _OVER;
if (pVgroup->replica == 3) { }
pNew1 = mndAcquireDnode(pMnode, 1); code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL);
pNew2 = mndAcquireDnode(pMnode, 2); } else if (pVgroup->replica == 3) {
pNew3 = mndAcquireDnode(pMnode, 3); if (redReq.dnodeId2 == -1 || redReq.dnodeId3 == -1) {
terrno = TSDB_CODE_MND_INVALID_REPLICA;
goto _OVER;
}
pNew1 = mndAcquireDnode(pMnode, redReq.dnodeId1);
pNew2 = mndAcquireDnode(pMnode, redReq.dnodeId2);
pNew3 = mndAcquireDnode(pMnode, redReq.dnodeId3);
pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId); pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
pOld2 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[1].dnodeId); pOld2 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[1].dnodeId);
pOld3 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[2].dnodeId); pOld3 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[2].dnodeId);
if (pNew1 == NULL || pOld1 == NULL || pNew2 == NULL || pOld2 == NULL || pNew3 == NULL || pOld3 == NULL) goto _OVER; if (pNew1 == NULL || pOld1 == NULL || pNew2 == NULL || pOld2 == NULL || pNew3 == NULL || pOld3 == NULL) {
if (!mndIsDnodeOnline(pNew1, curMs) || !mndIsDnodeOnline(pOld1, curMs) || !mndIsDnodeOnline(pNew2, curMs) || terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
!mndIsDnodeOnline(pOld2, curMs) || !mndIsDnodeOnline(pNew3, curMs) || !mndIsDnodeOnline(pOld3, curMs)) { goto _OVER;
terrno = TSDB_CODE_NODE_OFFLINE; }
if (pNew1 == pNew2 || pNew1 == pNew3 || pNew2 == pNew3) {
terrno = TSDB_CODE_MND_INVALID_REPLICA;
goto _OVER; goto _OVER;
} }
bool changed = true; bool changed = false;
if (pNew1 != pOld1 || pNew1 != pOld2 || pNew1 != pOld3) changed = true; if (pNew1 != pOld1 && pNew1 != pOld2 && pNew1 != pOld3) changed = true;
if (pNew2 != pOld1 || pNew2 != pOld2 || pNew2 != pOld3) changed = true; if (pNew2 != pOld1 && pNew2 != pOld2 && pNew2 != pOld3) changed = true;
if (pNew3 != pOld1 || pNew3 != pOld2 || pNew3 != pOld3) changed = true; if (pNew3 != pOld1 && pNew3 != pOld2 && pNew3 != pOld3) changed = true;
if (!changed) { if (!changed) {
terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED; terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
goto _OVER; goto _OVER;
} }
if (mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3) != 0) goto _OVER; if (!mndIsDnodeOnline(pNew1, curMs) || !mndIsDnodeOnline(pOld1, curMs) || !mndIsDnodeOnline(pNew2, curMs) ||
!mndIsDnodeOnline(pOld2, curMs) || !mndIsDnodeOnline(pNew3, curMs) || !mndIsDnodeOnline(pOld3, curMs)) {
terrno = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
goto _OVER;
}
code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3);
} else {
terrno = TSDB_CODE_MND_INVALID_REPLICA;
goto _OVER;
} }
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("vgId:%d, failed to redistribute since %s", 1, terrstr()); mError("vgId:%d, failed to redistribute to dnode %d %d %d since %s", redReq.vgId, redReq.dnodeId1, redReq.dnodeId2,
redReq.dnodeId3, terrstr());
} }
mndReleaseDnode(pMnode, pNew1); mndReleaseDnode(pMnode, pNew1);
...@@ -1303,9 +1349,7 @@ static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { ...@@ -1303,9 +1349,7 @@ static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
if (mndCheckNodeAuth(pUser) != 0) { if (mndCheckNodeAuth(pUser) != 0) goto _OVER;
goto _OVER;
}
code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup); code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
......
...@@ -357,7 +357,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -357,7 +357,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
SdbEncodeFp encodeFp = pSdb->encodeFps[i]; SdbEncodeFp encodeFp = pSdb->encodeFps[i];
if (encodeFp == NULL) continue; if (encodeFp == NULL) continue;
mTrace("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); mDebug("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
SHashObj *hash = pSdb->hashObjs[i]; SHashObj *hash = pSdb->hashObjs[i];
TdThreadRwlock *pLock = &pSdb->locks[i]; TdThreadRwlock *pLock = &pSdb->locks[i];
......
...@@ -83,6 +83,7 @@ const char *sdbStatusName(ESdbStatus status) { ...@@ -83,6 +83,7 @@ const char *sdbStatusName(ESdbStatus status) {
} }
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) { void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
#if 0
EKeyType keyType = pSdb->keyTypes[pRow->type]; EKeyType keyType = pSdb->keyTypes[pRow->type];
if (keyType == SDB_KEY_BINARY) { if (keyType == SDB_KEY_BINARY) {
...@@ -96,6 +97,7 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) { ...@@ -96,6 +97,7 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
pRow->refCount, oper, pRow->pObj, sdbStatusName(pRow->status)); pRow->refCount, oper, pRow->pObj, sdbStatusName(pRow->status));
} else { } else {
} }
#endif
} }
static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) { static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
......
...@@ -37,13 +37,17 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) { ...@@ -37,13 +37,17 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
pRaw->sver = sver; pRaw->sver = sver;
pRaw->dataLen = dataLen; pRaw->dataLen = dataLen;
#if 0
mTrace("raw:%p, is created, len:%d table:%s", pRaw, dataLen, sdbTableName(type)); mTrace("raw:%p, is created, len:%d table:%s", pRaw, dataLen, sdbTableName(type));
#endif
return pRaw; return pRaw;
} }
void sdbFreeRaw(SSdbRaw *pRaw) { void sdbFreeRaw(SSdbRaw *pRaw) {
if (pRaw != NULL) { if (pRaw != NULL) {
#if 0
mTrace("raw:%p, is freed", pRaw); mTrace("raw:%p, is freed", pRaw);
#endif
taosMemoryFree(pRaw); taosMemoryFree(pRaw);
} }
} }
......
...@@ -23,7 +23,9 @@ SSdbRow *sdbAllocRow(int32_t objSize) { ...@@ -23,7 +23,9 @@ SSdbRow *sdbAllocRow(int32_t objSize) {
return NULL; return NULL;
} }
#if 0
mTrace("row:%p, is created, len:%d", pRow->pObj, objSize); mTrace("row:%p, is created, len:%d", pRow->pObj, objSize);
#endif
return pRow; return pRow;
} }
...@@ -45,6 +47,8 @@ void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc) { ...@@ -45,6 +47,8 @@ void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc) {
sdbPrintOper(pSdb, pRow, "free"); sdbPrintOper(pSdb, pRow, "free");
#if 0
mTrace("row:%p, is freed", pRow->pObj); mTrace("row:%p, is freed", pRow->pObj);
#endif
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
} }
...@@ -187,6 +187,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigC ...@@ -187,6 +187,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigC
// todo rpc response here // todo rpc response here
// build rpc msg // build rpc msg
// put into apply queue // put into apply queue
vnodePostBlockMsg(pVnode, TDMT_VND_ALTER_REPLICA);
} }
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
......
...@@ -90,7 +90,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_AUTH_FAILURE, "Authentication failur ...@@ -90,7 +90,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_AUTH_FAILURE, "Authentication failur
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish connection") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish connection")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INDIRECT_NETWORK_UNAVAIL, "Unable to establish connection")
//client //client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
...@@ -225,6 +224,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_IN_DNODE, "Vgroup not in dnode") ...@@ -225,6 +224,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_IN_DNODE, "Vgroup not in dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "Vgroup does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "Vgroup does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_UN_CHANGED, "Vgroup distribution has not changed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_UN_CHANGED, "Vgroup distribution has not changed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_HAS_OFFLINE_DNODE, "Offline dnode exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_HAS_OFFLINE_DNODE, "Offline dnode exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_REPLICA, "Invalid vgroup replica")
// mnode-stable // mnode-stable
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "STable already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "STable already exists")
...@@ -265,6 +265,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill ...@@ -265,6 +265,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction not completed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction not completed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog is null") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog is null")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction")
// mnode-mq // mnode-mq
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/deploy.sh -n dnode5 -i 5
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
#system sh/exec.sh -n dnode5 -s start
sql connect
sql create user u1 pass 'taosdata'
print =============== step1 create dnode2
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
sql create dnode $hostname port 7500
$x = 0
step1:
$ = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
print ===> $data20 $data21 $data22 $data23 $data24 $data25
print ===> $data30 $data31 $data32 $data33 $data34 $data35
print ===> $data40 $data41 $data42 $data43 $data44 $data45
if $rows != 5 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
if $data(3)[4] != ready then
goto step1
endi
if $data(4)[4] != ready then
goto step1
endi
#if $data(5)[4] != ready then
# goto step1
#endi
print =============== step2: create db
sql create database d1 vgroups 1 replica 3
# Invalid vgroup
sql_error redistribute vgroup 3 dnode 5 dnode 3 dnode 4
# un changed
sql_error redistribute vgroup 2 dnode 2 dnode 3 dnode 4
# no enought vnodes
sql_error redistribute vgroup 2 dnode 1 dnode 3 dnode 4
# offline vnodes
sql_error redistribute vgroup 2 dnode 5 dnode 3 dnode 4
# Invalid replica
sql_error redistribute vgroup 2 dnode 5
sql_error redistribute vgroup 2 dnode 5 dnode 3
sql_error redistribute vgroup 2 dnode 2 dnode 3
sql_error redistribute vgroup 2 dnode 2 dnode 2
sql_error redistribute vgroup 3 dnode 2 dnode 2
system sh/exec.sh -n dnode5 -s start
$x = 0
step2:
$ = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
print ===> $data20 $data21 $data22 $data23 $data24 $data25
print ===> $data30 $data31 $data32 $data33 $data34 $data35
print ===> $data40 $data41 $data42 $data43 $data44 $data45
if $rows != 5 then
return -1
endi
if $data(1)[4] != ready then
goto step2
endi
if $data(2)[4] != ready then
goto step2
endi
if $data(3)[4] != ready then
goto step2
endi
if $data(4)[4] != ready then
goto step2
endi
if $data(5)[4] != ready then
goto step2
endi
print =============== step31: move follower
$leaderExist = 0
$leaderVnode = 0
$follower1 = 0
$follower2 = 0
$x = 0
step3:
$ = $x + 1
sleep 1000
if $x == 10 then
print ====> db not ready!
return -1
endi
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
if $rows != 1 then
return -1
endi
if $data(2)[4] == leader then
$leaderExist = 1
$leaderVnode = 4
$follower1 = 2
$follower2 = 3
endi
if $data(2)[6] == leader then
$leaderExist = 1
$leaderVnode = 3
$follower1 = 2
$follower2 = 4
endi
if $data(2)[8] == leader then
$leaderExist = 1
$leaderVnode = 2
$follower1 = 3
$follower2 = 4
endi
if $leaderExist != 1 then
goto step3
endi
print leader $leaderVnode
print follower1 $follower1
print follower2 $follower2
print =============== step32: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
return
print =============== step33: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
print =============== step34: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
print =============== step35: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
print =============== step4: move leader
return
print =============== step3: drop dnode 3
return
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册